From 019fe44dead3775d49ee64cfa378e9d843c77577 Mon Sep 17 00:00:00 2001 From: Alexander Nyurenberg Date: Sun, 19 Dec 2021 02:32:12 +0300 Subject: [PATCH] Added the ability to specify the exact number of users for spawning of each class with class field. Fixed users spawn first. The weight parameter for them is ignored. Also added unit-tests for cases with fixed users. --- locust/dispatch.py | 96 ++++++++++---- locust/test/test_dispatch.py | 237 ++++++++++++++++++++++++++++++++++- locust/user/users.py | 7 ++ 3 files changed, 317 insertions(+), 23 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index dbd6aaa0ea..0848183931 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -4,7 +4,7 @@ import time from collections.abc import Iterator from operator import attrgetter -from typing import Dict, Generator, List, TYPE_CHECKING, Tuple, Type +from typing import Dict, Generator, List, TYPE_CHECKING, Optional, Tuple, Type import gevent import typing @@ -98,6 +98,10 @@ def __init__(self, worker_nodes: "List[WorkerNode]", user_classes: List[Type[Use self._rebalance = False + self._try_dispatch_fixed = True + + self._no_user_to_spawn = False + @property def dispatch_in_progress(self): return self._dispatch_in_progress @@ -132,6 +136,9 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]: if self._rebalance: self._rebalance = False yield self._users_on_workers + if self._no_user_to_spawn: + self._no_user_to_spawn = False + break while self._current_user_count > self._target_user_count: with self._wait_between_dispatch_iteration_context(): @@ -242,12 +249,17 @@ def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]: self._current_user_count + self._user_count_per_dispatch_iteration, self._target_user_count ) for user in self._user_generator: + if not user: + self._no_user_to_spawn = True + break worker_node = next(self._worker_node_generator) self._users_on_workers[worker_node.id][user] += 1 self._current_user_count += 1 self._active_users.append((worker_node, user)) if self._current_user_count >= current_user_count_target: - return self._users_on_workers + break + + return self._users_on_workers def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]: """Remove users from the workers until the target number of users is reached for the current dispatch iteration @@ -264,9 +276,17 @@ def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]: return self._users_on_workers self._users_on_workers[worker_node.id][user] -= 1 self._current_user_count -= 1 + self._try_dispatch_fixed = True if self._current_user_count == 0 or self._current_user_count <= current_user_count_target: return self._users_on_workers + def _get_user_current_count(self, user: str) -> int: + count = 0 + for users_on_node in self._users_on_workers.values(): + count += users_on_node.get(user, 0) + + return count + def _distribute_users( self, target_user_count: int ) -> Tuple[dict, Generator[str, None, None], typing.Iterator["WorkerNode"], List[Tuple["WorkerNode", str]]]: @@ -307,26 +327,58 @@ def _user_gen(self) -> Generator[str, None, None]: weighted round-robin algorithm, we'd get AAAAABAAAAAB which would make the distribution less accurate during ramp-up/down. """ - # Normalize the weights so that the smallest weight will be equal to "target_min_weight". - # The value "2" was experimentally determined because it gave a better distribution especially - # when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc. - target_min_weight = 2 - min_weight = min(u.weight for u in self._user_classes) - normalized_weights = [ - (user_class.__name__, round(target_min_weight * user_class.weight / min_weight)) - for user_class in self._user_classes - ] - gen = smooth(normalized_weights) - # Instead of calling `gen()` for each user, we cycle through a generator of fixed-length - # `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because - # we only ever need to call `gen()` a relatively small number of times. The length of this generator - # is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is - # 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over - # until the target user count is reached. - generation_length_to_get_proper_distribution = sum( - normalized_weight[1] for normalized_weight in normalized_weights - ) - yield from itertools.cycle(gen() for _ in range(generation_length_to_get_proper_distribution)) + + def infinite_cycle_gen(users: List[Tuple[User, int]]) -> Generator[Optional[str], None, None]: + if not users: + return itertools.cycle([None]) + + # Normalize the weights so that the smallest weight will be equal to "target_min_weight". + # The value "2" was experimentally determined because it gave a better distribution especially + # when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc. + target_min_weight = 2 + + # 'Value' here means weight or fixed count + normalized_values = [ + ( + user.__name__, + round(target_min_weight * value / min([u[1] for u in users])), + ) + for user, value in users + ] + generation_length_to_get_proper_distribution = sum( + normalized_val[1] for normalized_val in normalized_values + ) + gen = smooth(normalized_values) + # Instead of calling `gen()` for each user, we cycle through a generator of fixed-length + # `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because + # we only ever need to call `gen()` a relatively small number of times. The length of this generator + # is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is + # 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over + # until the target user count is reached. + return itertools.cycle(gen() for _ in range(generation_length_to_get_proper_distribution)) + + fixed_users = {u.__name__: u for u in self._user_classes if u.fixed_count} + + cycle_fixed_gen = infinite_cycle_gen([(u, u.fixed_count) for u in fixed_users.values()]) + cycle_weighted_gen = infinite_cycle_gen([(u, u.weight) for u in self._user_classes if not u.fixed_count]) + + # Spawn users + while True: + if self._try_dispatch_fixed: + spawned_classes = set() + while True: + user_name = next(cycle_fixed_gen) + if not user_name: + break + if self._get_user_current_count(user_name) >= fixed_users[user_name].fixed_count: + spawned_classes.add(user_name) + else: + yield user_name + if len(spawned_classes) == len(fixed_users): + break + self._try_dispatch_fixed = False + + yield next(cycle_weighted_gen) @staticmethod def _fast_users_on_workers_copy(users_on_workers: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]: diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index f648132a05..fa9781dcb2 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -1,7 +1,7 @@ import time import unittest from operator import attrgetter -from typing import Dict +from typing import Dict, List, Tuple from locust import User from locust.dispatch import UsersDispatcher @@ -3291,6 +3291,241 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) +class TestRampUpUsersFromZeroWithFixed(unittest.TestCase): + class Case: + def __init__(self, fixed_counts: Tuple[int], weights: Tuple[int], target_user_count: int): + self.fixed_counts = fixed_counts + self.weights = weights + self.target_user_count = target_user_count + + def case_handler(self, cases: List[Case], expected: Dict[str, int], user_classes=List[User]): + self.assertEqual(len(cases), len(expected)) + + for case_num in range(len(cases)): + # Reset to defaul values + for user_class in user_classes: + user_class.weight, user_class.fixed_count = 1, 0 + + case = cases[case_num] + self.assertEqual( + len(case.fixed_counts) + len(case.weights), + len(user_classes), + msg="Invalid test case or user list.", + ) + + fixed_users_list = user_classes[: len(case.fixed_counts)] + weighted_users_list = user_classes[len(case.fixed_counts) :] + + for user, fixed_count in zip(fixed_users_list, case.fixed_counts): + user.fixed_count = fixed_count + + for user, weight in zip(weighted_users_list, case.weights): + user.weight = weight + + worker_node1 = WorkerNode("1") + + users_dispatcher = UsersDispatcher(worker_nodes=[worker_node1], user_classes=user_classes) + users_dispatcher.new_dispatch(target_user_count=case.target_user_count, spawn_rate=0.5) + users_dispatcher._wait_between_dispatch = 0 + + iterations = list(users_dispatcher) + self.assertDictEqual(iterations[-1]["1"], expected[case_num]) + + def test_ramp_up_2_weigted_user_with_1_fixed_user(self): + class User1(User): + ... + + class User2(User): + ... + + class User3(User): + ... + + self.case_handler( + cases=[ + self.Case(fixed_counts=(), weights=(1, 1, 1), target_user_count=3), + self.Case(fixed_counts=(1,), weights=(1, 1), target_user_count=3), + self.Case(fixed_counts=(1,), weights=(1, 1), target_user_count=9), + self.Case(fixed_counts=(8,), weights=(1, 1), target_user_count=10), + self.Case(fixed_counts=(2,), weights=(1, 1), target_user_count=1000), + self.Case(fixed_counts=(100,), weights=(1, 1), target_user_count=1000), + self.Case(fixed_counts=(960,), weights=(1, 1), target_user_count=1000), + self.Case(fixed_counts=(9990,), weights=(1, 1), target_user_count=10000), + self.Case(fixed_counts=(100,), weights=(1, 1), target_user_count=100), + ], + expected=[ + {"User1": 1, "User2": 1, "User3": 1}, + {"User1": 1, "User2": 1, "User3": 1}, + {"User1": 1, "User2": 4, "User3": 4}, + {"User1": 8, "User2": 1, "User3": 1}, + {"User1": 2, "User2": 499, "User3": 499}, + {"User1": 100, "User2": 450, "User3": 450}, + {"User1": 960, "User2": 20, "User3": 20}, + {"User1": 9990, "User2": 5, "User3": 5}, + {"User1": 100, "User2": 0, "User3": 0}, + ], + user_classes=[User1, User2, User3], + ) + + def test_ramp_up_various_count_weigted_and_fixed_users(self): + class User1(User): + ... + + class User2(User): + ... + + class User3(User): + ... + + class User4(User): + ... + + class User5(User): + ... + + self.case_handler( + cases=[ + self.Case(fixed_counts=(1, 1), weights=(1, 1, 1), target_user_count=5), + self.Case(fixed_counts=(5, 2), weights=(1, 1, 1), target_user_count=10), + self.Case(fixed_counts=(9, 1), weights=(5, 3, 2), target_user_count=20), + self.Case(fixed_counts=(996,), weights=(1, 1, 1, 1), target_user_count=1000), + self.Case(fixed_counts=(500,), weights=(2, 1, 1, 1), target_user_count=1000), + self.Case(fixed_counts=(250, 250), weights=(3, 1, 1), target_user_count=1000), + self.Case(fixed_counts=(1, 1, 1, 1), weights=(100,), target_user_count=1000), + ], + expected=[ + {"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 5, "User2": 2, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 9, "User2": 1, "User3": 5, "User4": 3, "User5": 2}, + {"User1": 996, "User2": 1, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 500, "User2": 200, "User3": 100, "User4": 100, "User5": 100}, + {"User1": 250, "User2": 250, "User3": 300, "User4": 100, "User5": 100}, + {"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 996}, + ], + user_classes=[User1, User2, User3, User4, User5], + ) + + def test_ramp_up_only_fixed_users(self): + class User1(User): + ... + + class User2(User): + ... + + class User3(User): + ... + + class User4(User): + ... + + class User5(User): + ... + + self.case_handler( + cases=[ + self.Case(fixed_counts=(1, 1, 1, 1, 1), weights=(), target_user_count=5), + self.Case(fixed_counts=(13, 26, 39, 52, 1), weights=(), target_user_count=131), + self.Case(fixed_counts=(10, 10, 10, 10, 10), weights=(), target_user_count=100), + self.Case(fixed_counts=(10, 10, 10, 10, 10), weights=(), target_user_count=50), + ], + expected=[ + {"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 1}, + {"User1": 13, "User2": 26, "User3": 39, "User4": 52, "User5": 1}, + {"User1": 10, "User2": 10, "User3": 10, "User4": 10, "User5": 10}, + {"User1": 10, "User2": 10, "User3": 10, "User4": 10, "User5": 10}, + ], + user_classes=[User1, User2, User3, User4, User5], + ) + + def test_ramp_up_ramp_down_and_rump_up_again_fixed(self): + for weights, fixed_counts in [ + [(1, 1, 1, 1, 1), (100, 100, 50, 50, 200)], + [(1, 1, 1, 1, 1), (100, 150, 50, 50, 0)], + [(1, 1, 1, 1, 1), (200, 100, 50, 0, 0)], + [(1, 1, 1, 1, 1), (200, 100, 0, 0, 0)], + [(1, 1, 1, 1, 1), (200, 0, 0, 0, 0)], + [(1, 1, 1, 1, 1), (0, 0, 0, 0, 0)], + ]: + + u1_weight, u2_weight, u3_weight, u4_weight, u5_weight = weights + u1_fixed_count, u2_fixed_count, u3_fixed_count, u4_fixed_count, u5_fixed_count = fixed_counts + + class User1(User): + weight = u1_weight + fixed_count = u1_fixed_count + + class User2(User): + weight = u2_weight + fixed_count = u2_fixed_count + + class User3(User): + weight = u3_weight + fixed_count = u3_fixed_count + + class User4(User): + weight = u4_weight + fixed_count = u4_fixed_count + + class User5(User): + weight = u5_weight + fixed_count = u5_fixed_count + + target_user_counts = [sum(fixed_counts), sum(fixed_counts) + 100] + down_counts = [0, max(min(fixed_counts) - 1, 0)] + user_classes = [User1, User2, User3, User4, User5] + + for worker_count in [3, 5, 9]: + workers = [WorkerNode(str(i + 1)) for i in range(worker_count)] + users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=user_classes) + + for down_to_count in down_counts: + for target_user_count in target_user_counts: + + # Ramp-up to go to `target_user_count` ######### + + users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=1) + users_dispatcher._wait_between_dispatch = 0 + + list(users_dispatcher) + + for user_class in user_classes: + if user_class.fixed_count: + self.assertEqual( + users_dispatcher._get_user_current_count(user_class.__name__), + user_class.fixed_count, + ) + + # Ramp-down to go to `down_to_count` + # and ensure what the fixed users was decreased too + + users_dispatcher.new_dispatch(target_user_count=down_to_count, spawn_rate=1) + users_dispatcher._wait_between_dispatch = 0 + + list(users_dispatcher) + + for user_class in user_classes: + if user_class.fixed_count: + self.assertNotEqual( + users_dispatcher._get_user_current_count(user_class.__name__), + user_class.fixed_count, + ) + + # Ramp-up go back to `target_user_count` and ensure + # what the fixed users return to their counts + + users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=1) + users_dispatcher._wait_between_dispatch = 0 + + list(users_dispatcher) + + for user_class in user_classes: + if user_class.fixed_count: + self.assertEqual( + users_dispatcher._get_user_current_count(user_class.__name__), + user_class.fixed_count, + ) + + def _aggregate_dispatched_users(d: Dict[str, Dict[str, int]]) -> Dict[str, int]: user_classes = list(next(iter(d.values())).keys()) return {u: sum(d[u] for d in d.values()) for u in user_classes} diff --git a/locust/user/users.py b/locust/user/users.py index db924faf92..178f41ce79 100644 --- a/locust/user/users.py +++ b/locust/user/users.py @@ -97,6 +97,13 @@ class ForumPage(TaskSet): weight = 1 """Probability of user class being chosen. The higher the weight, the greater the chance of it being chosen.""" + fixed_count = 0 + """ + If the value > 0, the weight property will be ignored and the users will be spawned. + These users are spawed first. If target count is not enougth to spawn all users, + the final count of each user is undefined. + """ + abstract = True """If abstract is True, the class is meant to be subclassed, and locust will not spawn users of this class during a test."""