diff --git a/locust/dispatch.py b/locust/dispatch.py index dbd6aaa0ea..0848183931 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -4,7 +4,7 @@ import time from collections.abc import Iterator from operator import attrgetter -from typing import Dict, Generator, List, TYPE_CHECKING, Tuple, Type +from typing import Dict, Generator, List, TYPE_CHECKING, Optional, Tuple, Type import gevent import typing @@ -98,6 +98,10 @@ def __init__(self, worker_nodes: "List[WorkerNode]", user_classes: List[Type[Use self._rebalance = False + self._try_dispatch_fixed = True + + self._no_user_to_spawn = False + @property def dispatch_in_progress(self): return self._dispatch_in_progress @@ -132,6 +136,9 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]: if self._rebalance: self._rebalance = False yield self._users_on_workers + if self._no_user_to_spawn: + self._no_user_to_spawn = False + break while self._current_user_count > self._target_user_count: with self._wait_between_dispatch_iteration_context(): @@ -242,12 +249,17 @@ def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]: self._current_user_count + self._user_count_per_dispatch_iteration, self._target_user_count ) for user in self._user_generator: + if not user: + self._no_user_to_spawn = True + break worker_node = next(self._worker_node_generator) self._users_on_workers[worker_node.id][user] += 1 self._current_user_count += 1 self._active_users.append((worker_node, user)) if self._current_user_count >= current_user_count_target: - return self._users_on_workers + break + + return self._users_on_workers def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]: """Remove users from the workers until the target number of users is reached for the current dispatch iteration @@ -264,9 +276,17 @@ def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]: return self._users_on_workers self._users_on_workers[worker_node.id][user] -= 1 self._current_user_count -= 1 + self._try_dispatch_fixed = True if self._current_user_count == 0 or self._current_user_count <= current_user_count_target: return self._users_on_workers + def _get_user_current_count(self, user: str) -> int: + count = 0 + for users_on_node in self._users_on_workers.values(): + count += users_on_node.get(user, 0) + + return count + def _distribute_users( self, target_user_count: int ) -> Tuple[dict, Generator[str, None, None], typing.Iterator["WorkerNode"], List[Tuple["WorkerNode", str]]]: @@ -307,26 +327,58 @@ def _user_gen(self) -> Generator[str, None, None]: weighted round-robin algorithm, we'd get AAAAABAAAAAB which would make the distribution less accurate during ramp-up/down. """ - # Normalize the weights so that the smallest weight will be equal to "target_min_weight". - # The value "2" was experimentally determined because it gave a better distribution especially - # when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc. - target_min_weight = 2 - min_weight = min(u.weight for u in self._user_classes) - normalized_weights = [ - (user_class.__name__, round(target_min_weight * user_class.weight / min_weight)) - for user_class in self._user_classes - ] - gen = smooth(normalized_weights) - # Instead of calling `gen()` for each user, we cycle through a generator of fixed-length - # `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because - # we only ever need to call `gen()` a relatively small number of times. The length of this generator - # is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is - # 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over - # until the target user count is reached. - generation_length_to_get_proper_distribution = sum( - normalized_weight[1] for normalized_weight in normalized_weights - ) - yield from itertools.cycle(gen() for _ in range(generation_length_to_get_proper_distribution)) + + def infinite_cycle_gen(users: List[Tuple[User, int]]) -> Generator[Optional[str], None, None]: + if not users: + return itertools.cycle([None]) + + # Normalize the weights so that the smallest weight will be equal to "target_min_weight". + # The value "2" was experimentally determined because it gave a better distribution especially + # when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc. + target_min_weight = 2 + + # 'Value' here means weight or fixed count + normalized_values = [ + ( + user.__name__, + round(target_min_weight * value / min([u[1] for u in users])), + ) + for user, value in users + ] + generation_length_to_get_proper_distribution = sum( + normalized_val[1] for normalized_val in normalized_values + ) + gen = smooth(normalized_values) + # Instead of calling `gen()` for each user, we cycle through a generator of fixed-length + # `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because + # we only ever need to call `gen()` a relatively small number of times. The length of this generator + # is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is + # 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over + # until the target user count is reached. + return itertools.cycle(gen() for _ in range(generation_length_to_get_proper_distribution)) + + fixed_users = {u.__name__: u for u in self._user_classes if u.fixed_count} + + cycle_fixed_gen = infinite_cycle_gen([(u, u.fixed_count) for u in fixed_users.values()]) + cycle_weighted_gen = infinite_cycle_gen([(u, u.weight) for u in self._user_classes if not u.fixed_count]) + + # Spawn users + while True: + if self._try_dispatch_fixed: + spawned_classes = set() + while True: + user_name = next(cycle_fixed_gen) + if not user_name: + break + if self._get_user_current_count(user_name) >= fixed_users[user_name].fixed_count: + spawned_classes.add(user_name) + else: + yield user_name + if len(spawned_classes) == len(fixed_users): + break + self._try_dispatch_fixed = False + + yield next(cycle_weighted_gen) @staticmethod def _fast_users_on_workers_copy(users_on_workers: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]: diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index f648132a05..e227570285 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -1,7 +1,7 @@ import time import unittest from operator import attrgetter -from typing import Dict +from typing import Dict, List, Tuple from locust import User from locust.dispatch import UsersDispatcher @@ -3291,6 +3291,239 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) +class TestRampUpUsersFromZeroWithFixed(unittest.TestCase): + class Case: + def __init__(self, fixed_counts: Tuple[int], weights: Tuple[int], target_user_count: int) -> None: + self.fixed_counts = fixed_counts + self.weights = weights + self.target_user_count = target_user_count + + def case_handler(self, cases: List[Case], expected: Dict[str, int], user_classes=List[User]): + assert len(cases) == len(expected), "Different len of 'cases' and 'expected'" + + for case_num in range(len(cases)): + # Reset to defaul values + for user_class in user_classes: + user_class.weight, user_class.fixed_count = 1, 0 + + case = cases[case_num] + assert (len(case.fixed_counts) + len(case.weights)) == len( + user_classes + ), "Different number of values and user classes, unable to match" + + fixed_users_list = user_classes[: len(case.fixed_counts)] + weighted_users_list = user_classes[len(case.fixed_counts) :] + + for user, fixed_count in zip(fixed_users_list, case.fixed_counts): + user.fixed_count = fixed_count + + for user, weight in zip(weighted_users_list, case.weights): + user.weight = weight + + worker_node1 = WorkerNode("1") + + users_dispatcher = UsersDispatcher(worker_nodes=[worker_node1], user_classes=user_classes) + users_dispatcher.new_dispatch(target_user_count=case.target_user_count, spawn_rate=0.5) + users_dispatcher._wait_between_dispatch = 0 + + iterations = list(users_dispatcher) + self.assertDictEqual(iterations[-1]["1"], expected[case_num]) + + def test_ramp_up_2_weigted_user_with_1_fixed_user(self): + class User1(User): + ... + + class User2(User): + ... + + class User3(User): + ... + + self.case_handler( + cases=[ + self.Case(fixed_counts=(), weights=(1, 1, 1), target_user_count=3), + self.Case(fixed_counts=(1,), weights=(1, 1), target_user_count=3), + self.Case(fixed_counts=(1,), weights=(1, 1), target_user_count=9), + self.Case(fixed_counts=(8,), weights=(1, 1), target_user_count=10), + self.Case(fixed_counts=(2,), weights=(1, 1), target_user_count=1000), + self.Case(fixed_counts=(100,), weights=(1, 1), target_user_count=1000), + self.Case(fixed_counts=(960,), weights=(1, 1), target_user_count=1000), + self.Case(fixed_counts=(9990,), weights=(1, 1), target_user_count=10000), + self.Case(fixed_counts=(100,), weights=(1, 1), target_user_count=100), + ], + expected=[ + {"User1": 1, "User2": 1, "User3": 1}, + {"User1": 1, "User2": 1, "User3": 1}, + {"User1": 1, "User2": 4, "User3": 4}, + {"User1": 8, "User2": 1, "User3": 1}, + {"User1": 2, "User2": 499, "User3": 499}, + {"User1": 100, "User2": 450, "User3": 450}, + {"User1": 960, "User2": 20, "User3": 20}, + {"User1": 9990, "User2": 5, "User3": 5}, + {"User1": 100, "User2": 0, "User3": 0}, + ], + user_classes=[User1, User2, User3], + ) + + def test_ramp_up_various_count_weigted_and_fixed_users(self): + class User1(User): + ... + + class User2(User): + ... + + class User3(User): + ... + + class User4(User): + ... + + class User5(User): + ... + + self.case_handler( + cases=[ + self.Case(fixed_counts=(1, 1), weights=(1, 1, 1), target_user_count=5), + self.Case(fixed_counts=(5, 2), weights=(1, 1, 1), target_user_count=10), + self.Case(fixed_counts=(9, 1), weights=(5, 3, 2), target_user_count=20), + self.Case(fixed_counts=(996,), weights=(1, 1, 1, 1), target_user_count=1000), + self.Case(fixed_counts=(500,), weights=(2, 1, 1, 1), target_user_count=1000), + self.Case(fixed_counts=(250, 250), weights=(3, 1, 1), target_user_count=1000), + self.Case(fixed_counts=(1, 1, 1, 1), weights=(100,), target_user_count=1000), + ], + expected=[ + {"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 5, "User2": 2, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 9, "User2": 1, "User3": 5, "User4": 3, "User5": 2}, + {"User1": 996, "User2": 1, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 500, "User2": 200, "User3": 100, "User4": 100, "User5": 100}, + {"User1": 250, "User2": 250, "User3": 300, "User4": 100, "User5": 100}, + {"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 996}, + ], + user_classes=[User1, User2, User3, User4, User5], + ) + + def test_ramp_up_only_fixed_users(self): + class User1(User): + ... + + class User2(User): + ... + + class User3(User): + ... + + class User4(User): + ... + + class User5(User): + ... + + self.case_handler( + cases=[ + self.Case(fixed_counts=(1, 1, 1, 1, 1), weights=(), target_user_count=5), + self.Case(fixed_counts=(13, 26, 39, 52, 1), weights=(), target_user_count=131), + self.Case(fixed_counts=(10, 10, 10, 10, 10), weights=(), target_user_count=100), + self.Case(fixed_counts=(10, 10, 10, 10, 10), weights=(), target_user_count=50), + ], + expected=[ + {"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 13, "User2": 26, "User3": 39, "User4": 52, "User5": 1}, + {"User1": 10, "User2": 10, "User3": 10, "User4": 10, "User5": 10}, + {"User1": 10, "User2": 10, "User3": 10, "User4": 10, "User5": 10}, + ], + user_classes=[User1, User2, User3, User4, User5], + ) + + def test_ramp_up_ramp_down_and_rump_up_again_fixed(self): + for weights, fixed_counts in [ + [(1, 1, 1, 1, 1), (100, 100, 50, 50, 200)], + [(1, 1, 1, 1, 1), (100, 150, 50, 50, 0)], + [(1, 1, 1, 1, 1), (200, 100, 50, 0, 0)], + [(1, 1, 1, 1, 1), (200, 100, 0, 0, 0)], + [(1, 1, 1, 1, 1), (200, 0, 0, 0, 0)], + [(1, 1, 1, 1, 1), (0, 0, 0, 0, 0)], + ]: + + u1_weight, u2_weight, u3_weight, u4_weight, u5_weight = weights + u1_fixed_count, u2_fixed_count, u3_fixed_count, u4_fixed_count, u5_fixed_count = fixed_counts + + class User1(User): + weight = u1_weight + fixed_count = u1_fixed_count + + class User2(User): + weight = u2_weight + fixed_count = u2_fixed_count + + class User3(User): + weight = u3_weight + fixed_count = u3_fixed_count + + class User4(User): + weight = u4_weight + fixed_count = u4_fixed_count + + class User5(User): + weight = u5_weight + fixed_count = u5_fixed_count + + target_user_counts = [sum(fixed_counts), sum(fixed_counts) + 100] + down_counts = [0, max(min(fixed_counts) - 1, 0)] + user_classes = [User1, User2, User3, User4, User5] + + for worker_count in [3, 5, 9]: + workers = [WorkerNode(str(i + 1)) for i in range(worker_count)] + users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=user_classes) + + for down_to_count in down_counts: + for target_user_count in target_user_counts: + + # Ramp-up to go to `target_user_count` ######### + + users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=1) + users_dispatcher._wait_between_dispatch = 0 + + list(users_dispatcher) + + for user_class in user_classes: + if user_class.fixed_count: + self.assertEqual( + users_dispatcher._get_user_current_count(user_class.__name__), + user_class.fixed_count, + ) + + # Ramp-down to go to `down_to_count` + # and ensure what the fixed users was decreased too + + users_dispatcher.new_dispatch(target_user_count=down_to_count, spawn_rate=1) + users_dispatcher._wait_between_dispatch = 0 + + list(users_dispatcher) + + for user_class in user_classes: + if user_class.fixed_count: + self.assertNotEqual( + users_dispatcher._get_user_current_count(user_class.__name__), + user_class.fixed_count, + ) + + # Ramp-up go back to `target_user_count` and ensure + # what the fixed users return to their counts + + users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=1) + users_dispatcher._wait_between_dispatch = 0 + + list(users_dispatcher) + + for user_class in user_classes: + if user_class.fixed_count: + self.assertEqual( + users_dispatcher._get_user_current_count(user_class.__name__), + user_class.fixed_count, + ) + + def _aggregate_dispatched_users(d: Dict[str, Dict[str, int]]) -> Dict[str, int]: user_classes = list(next(iter(d.values())).keys()) return {u: sum(d[u] for d in d.values()) for u in user_classes} diff --git a/locust/user/users.py b/locust/user/users.py index db924faf92..178f41ce79 100644 --- a/locust/user/users.py +++ b/locust/user/users.py @@ -97,6 +97,13 @@ class ForumPage(TaskSet): weight = 1 """Probability of user class being chosen. The higher the weight, the greater the chance of it being chosen.""" + fixed_count = 0 + """ + If the value > 0, the weight property will be ignored and the users will be spawned. + These users are spawed first. If target count is not enougth to spawn all users, + the final count of each user is undefined. + """ + abstract = True """If abstract is True, the class is meant to be subclassed, and locust will not spawn users of this class during a test."""