From fb2a1ba1480264a4be3155c477e3d64f8431e5cf Mon Sep 17 00:00:00 2001 From: Alexander Nyurenberg Date: Sat, 15 Jan 2022 00:45:38 +0300 Subject: [PATCH] Changing realization of the fixed users spawning. Because in case then UserDispatcher._users_on_workers is not fills instantly (see UserDispatcher._distribute_users) we cant to monitore actual count of each user. Also added some additional tests, includes to check this behaviour. --- locust/dispatch.py | 24 +- locust/test/test_dispatch.py | 461 ++++++++++++----------------------- locust/test/test_runners.py | 96 ++++++++ 3 files changed, 274 insertions(+), 307 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index 8e7bf5b4e7..6f72bce6ac 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -248,6 +248,7 @@ def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]: current_user_count_target = min( self._current_user_count + self._user_count_per_dispatch_iteration, self._target_user_count ) + for user in self._user_generator: if not user: self._no_user_to_spawn = True @@ -309,6 +310,8 @@ def _distribute_users( user_count = 0 while user_count < target_user_count: user = next(user_gen) + if not user: + break worker_node = next(worker_gen) users_on_workers[worker_node.id][user] += 1 user_count += 1 @@ -366,18 +369,25 @@ def infinite_cycle_gen(users: List[Tuple[User, int]]) -> Generator[Optional[str] # Spawn users while True: if self._try_dispatch_fixed: + self._try_dispatch_fixed = False + current_fixed_users_count = {u: self._get_user_current_count(u) for u in fixed_users} spawned_classes = set() - while True: + while len(spawned_classes) != len(fixed_users): user_name = next(cycle_fixed_gen) if not user_name: break - if self._get_user_current_count(user_name) >= fixed_users[user_name].fixed_count: - spawned_classes.add(user_name) - else: + + if current_fixed_users_count[user_name] < fixed_users[user_name].fixed_count: + current_fixed_users_count[user_name] += 1 + if current_fixed_users_count[user_name] == fixed_users[user_name].fixed_count: + spawned_classes.add(user_name) yield user_name - if len(spawned_classes) == len(fixed_users): - break - self._try_dispatch_fixed = False + + # 'self._try_dispatch_fixed' was changed outhere, we have to recalculate current count + if self._try_dispatch_fixed: + current_fixed_users_count = {u: self._get_user_current_count(u) for u in fixed_users} + spawned_classes.clear() + self._try_dispatch_fixed = False yield next(cycle_weighted_gen) diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index 84ceade1eb..d9401a9c6c 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -1932,337 +1932,168 @@ class User2(User): class TestLargeScale(unittest.TestCase): - class User01(User): - weight = 5 - - class User02(User): - weight = 55 - - class User03(User): - weight = 37 - - class User04(User): - weight = 2 - - class User05(User): - weight = 97 - - class User06(User): - weight = 41 - - class User07(User): - weight = 33 - - class User08(User): - weight = 19 - - class User09(User): - weight = 19 - - class User10(User): - weight = 34 - - class User11(User): - weight = 78 - - class User12(User): - weight = 76 - - class User13(User): - weight = 28 - - class User14(User): - weight = 62 - - class User15(User): - weight = 69 - - class User16(User): - weight = 5 - - class User17(User): - weight = 55 - - class User18(User): - weight = 37 - - class User19(User): - weight = 2 - - class User20(User): - weight = 97 - - class User21(User): - weight = 41 - - class User22(User): - weight = 33 - - class User23(User): - weight = 19 - - class User24(User): - weight = 19 - - class User25(User): - weight = 34 - - class User26(User): - weight = 78 - - class User27(User): - weight = 76 - - class User28(User): - weight = 28 - - class User29(User): - weight = 62 - - class User30(User): - weight = 69 - - class User31(User): - weight = 41 - - class User32(User): - weight = 33 - - class User33(User): - weight = 19 - - class User34(User): - weight = 19 - - class User35(User): - weight = 34 - - class User36(User): - weight = 78 - - class User37(User): - weight = 76 - - class User38(User): - weight = 28 - - class User39(User): - weight = 62 - - class User40(User): - weight = 69 - - class User41(User): - weight = 41 - - class User42(User): - weight = 33 - - class User43(User): - weight = 19 - - class User44(User): - weight = 19 - - class User45(User): - weight = 34 - - class User46(User): - weight = 78 - - class User47(User): - weight = 76 - - class User48(User): - weight = 28 - - class User49(User): - weight = 62 - - class User50(User): - weight = 69 - - user_classes = [ - User01, - User02, - User03, - User04, - User05, - User06, - User07, - User08, - User09, - User10, - User11, - User12, - User13, - User14, - User15, - User16, - User17, - User18, - User19, - User20, - User21, - User22, - User23, - User24, - User25, - User26, - User27, - User28, - User29, - User30, - User31, - User32, - User33, - User34, - User35, - User36, - User37, - User38, - User39, - User40, - User41, - User42, - User43, - User44, - User45, - User46, - User47, - User48, - User49, - User50, + # fmt: off + weights = [ + 5, 55, 37, 2, 97, 41, 33, 19, 19, 34, 78, 76, 28, 62, 69, 5, 55, 37, 2, 97, 41, 33, 19, 19, 34, + 78, 76, 28, 62, 69, 41, 33, 19, 19, 34, 78, 76, 28, 62, 69, 41, 33, 19, 19, 34, 78, 76, 28, 62, 69 ] + # fmt: on + numerated_weights = dict(zip(range(len(weights)), weights)) + + weighted_user_classes = [type(f"User{i}", (User,), {"weight": w}) for i, w in numerated_weights.items()] + fixed_user_classes_10k = [type(f"FixedUser10k{i}", (User,), {"fixed_count": 2000}) for i in range(50)] + fixed_user_classes_1M = [type(f"FixedUser1M{i}", (User,), {"fixed_count": 20000}) for i in range(50)] + mixed_users = weighted_user_classes[:25] + fixed_user_classes_10k[25:] def test_distribute_users(self): - workers = [WorkerNode(str(i)) for i in range(10_000)] + for user_classes in [self.weighted_user_classes, self.fixed_user_classes_1M, self.mixed_users]: + workers = [WorkerNode(str(i)) for i in range(10_000)] - target_user_count = 1_000_000 + target_user_count = 1_000_000 - users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=self.user_classes) + users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=user_classes) - ts = time.perf_counter() - users_on_workers, user_gen, worker_gen, active_users = users_dispatcher._distribute_users( - target_user_count=target_user_count - ) - delta = time.perf_counter() - ts + ts = time.perf_counter() + users_on_workers, user_gen, worker_gen, active_users = users_dispatcher._distribute_users( + target_user_count=target_user_count + ) + delta = time.perf_counter() - ts - # Because tests are run with coverage, the code will be slower. - # We set the pass criterion to 5000ms, but in real life, the - # `_distribute_users` method runs faster than this. - self.assertLessEqual(1000 * delta, 5000) + # Because tests are run with coverage, the code will be slower. + # We set the pass criterion to 5000ms, but in real life, the + # `_distribute_users` method runs faster than this. + self.assertLessEqual(1000 * delta, 5000) - self.assertEqual(_user_count(users_on_workers), target_user_count) + self.assertEqual(_user_count(users_on_workers), target_user_count) def test_ramp_up_from_0_to_100_000_users_with_50_user_classes_and_1000_workers_and_5000_spawn_rate(self): - workers = [WorkerNode(str(i)) for i in range(1000)] + for user_classes in [ + self.weighted_user_classes, + self.fixed_user_classes_1M, + self.fixed_user_classes_10k, + self.mixed_users, + ]: + workers = [WorkerNode(str(i)) for i in range(1000)] - target_user_count = 100_000 + target_user_count = 100_000 - users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=self.user_classes) - users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=5_000) - users_dispatcher._wait_between_dispatch = 0 + users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=user_classes) + users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=5_000) + users_dispatcher._wait_between_dispatch = 0 - all_dispatched_users = list(users_dispatcher) + all_dispatched_users = list(users_dispatcher) - tol = 0.2 - self.assertTrue( - all( - dispatch_iteration_duration <= tol - for dispatch_iteration_duration in users_dispatcher.dispatch_iteration_durations - ), - "One or more dispatch took more than {:.0f}s to compute (max = {}ms)".format( - tol * 1000, 1000 * max(users_dispatcher.dispatch_iteration_durations) - ), - ) - - self.assertEqual(_user_count(all_dispatched_users[-1]), target_user_count) - - for dispatch_users in all_dispatched_users: - user_count_on_workers = [sum(user_classes_count.values()) for user_classes_count in dispatch_users.values()] - self.assertLessEqual( - max(user_count_on_workers) - min(user_count_on_workers), - 1, - "One or more workers have too much users compared to the other workers when user count is {}".format( - _user_count(dispatch_users) + tol = 0.2 + self.assertTrue( + all( + dispatch_iteration_duration <= tol + for dispatch_iteration_duration in users_dispatcher.dispatch_iteration_durations + ), + "One or more dispatch took more than {:.0f}s to compute (max = {}ms)".format( + tol * 1000, 1000 * max(users_dispatcher.dispatch_iteration_durations) ), ) - for i, dispatch_users in enumerate(all_dispatched_users): - aggregated_dispatched_users = _aggregate_dispatched_users(dispatch_users) - for user_class in self.user_classes: - target_relative_weight = user_class.weight / sum(map(attrgetter("weight"), self.user_classes)) - relative_weight = aggregated_dispatched_users[user_class.__name__] / _user_count(dispatch_users) - error_percent = 100 * (relative_weight - target_relative_weight) / target_relative_weight - if i == len(all_dispatched_users) - 1: - # We want the distribution to be as good as possible at the end of the ramp-up - tol = 0.5 - else: - tol = 15 + self.assertEqual(_user_count(all_dispatched_users[-1]), target_user_count) + + for dispatch_users in all_dispatched_users: + user_count_on_workers = [ + sum(user_classes_count.values()) for user_classes_count in dispatch_users.values() + ] self.assertLessEqual( - error_percent, - tol, - "Distribution for user class {} is off by more than {}% when user count is {}".format( - user_class, tol, _user_count(dispatch_users) + max(user_count_on_workers) - min(user_count_on_workers), + 1, + "One or more workers have too much users compared to the other workers when user count is {}".format( + _user_count(dispatch_users) ), ) + for i, dispatch_users in enumerate(all_dispatched_users): + aggregated_dispatched_users = _aggregate_dispatched_users(dispatch_users) + for user_class in [u for u in user_classes if not u.fixed_count]: + target_relative_weight = user_class.weight / sum( + map(attrgetter("weight"), [u for u in user_classes if not u.fixed_count]) + ) + relative_weight = aggregated_dispatched_users[user_class.__name__] / _user_count(dispatch_users) + error_percent = 100 * (relative_weight - target_relative_weight) / target_relative_weight + if i == len(all_dispatched_users) - 1: + # We want the distribution to be as good as possible at the end of the ramp-up + tol = 0.5 + else: + tol = 15 + self.assertLessEqual( + error_percent, + tol, + "Distribution for user class {} is off by more than {}% when user count is {}".format( + user_class, tol, _user_count(dispatch_users) + ), + ) + def test_ramp_down_from_100_000_to_0_users_with_50_user_classes_and_1000_workers_and_5000_spawn_rate(self): - initial_user_count = 100_000 + for user_classes in [ + self.weighted_user_classes, + self.fixed_user_classes_1M, + self.fixed_user_classes_10k, + self.mixed_users, + ]: + initial_user_count = 100_000 - workers = [WorkerNode(str(i)) for i in range(1000)] + workers = [WorkerNode(str(i)) for i in range(1000)] - # Ramp-up - users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=self.user_classes) - users_dispatcher.new_dispatch(target_user_count=initial_user_count, spawn_rate=initial_user_count) - users_dispatcher._wait_between_dispatch = 0 - list(users_dispatcher) + # Ramp-up + users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=user_classes) + users_dispatcher.new_dispatch(target_user_count=initial_user_count, spawn_rate=initial_user_count) + users_dispatcher._wait_between_dispatch = 0 + list(users_dispatcher) - # Ramp-down - users_dispatcher.new_dispatch(target_user_count=0, spawn_rate=5000) - users_dispatcher._wait_between_dispatch = 0 + # Ramp-down + users_dispatcher.new_dispatch(target_user_count=0, spawn_rate=5000) + users_dispatcher._wait_between_dispatch = 0 - all_dispatched_users = list(users_dispatcher) + all_dispatched_users = list(users_dispatcher) - tol = 0.2 - self.assertTrue( - all( - dispatch_iteration_duration <= tol - for dispatch_iteration_duration in users_dispatcher.dispatch_iteration_durations - ), - "One or more dispatch took more than {:.0f}ms to compute (max = {}ms)".format( - tol * 1000, 1000 * max(users_dispatcher.dispatch_iteration_durations) - ), - ) - - self.assertEqual(_user_count(all_dispatched_users[-1]), 0) - - for dispatch_users in all_dispatched_users[:-1]: - user_count_on_workers = [sum(user_classes_count.values()) for user_classes_count in dispatch_users.values()] - self.assertLessEqual( - max(user_count_on_workers) - min(user_count_on_workers), - 1, - "One or more workers have too much users compared to the other workers when user count is {}".format( - _user_count(dispatch_users) + tol = 0.2 + self.assertTrue( + all( + dispatch_iteration_duration <= tol + for dispatch_iteration_duration in users_dispatcher.dispatch_iteration_durations + ), + "One or more dispatch took more than {:.0f}ms to compute (max = {}ms)".format( + tol * 1000, 1000 * max(users_dispatcher.dispatch_iteration_durations) ), ) - for dispatch_users in all_dispatched_users[:-1]: - aggregated_dispatched_users = _aggregate_dispatched_users(dispatch_users) - for user_class in self.user_classes: - target_relative_weight = user_class.weight / sum(map(attrgetter("weight"), self.user_classes)) - relative_weight = aggregated_dispatched_users[user_class.__name__] / _user_count(dispatch_users) - error_percent = 100 * (relative_weight - target_relative_weight) / target_relative_weight - tol = 15 + self.assertEqual(_user_count(all_dispatched_users[-1]), 0) + + for dispatch_users in all_dispatched_users[:-1]: + user_count_on_workers = [ + sum(user_classes_count.values()) for user_classes_count in dispatch_users.values() + ] self.assertLessEqual( - error_percent, - tol, - "Distribution for user class {} is off by more than {}% when user count is {}".format( - user_class, tol, _user_count(dispatch_users) + max(user_count_on_workers) - min(user_count_on_workers), + 1, + "One or more workers have too much users compared to the other workers when user count is {}".format( + _user_count(dispatch_users) ), ) + for dispatch_users in all_dispatched_users[:-1]: + aggregated_dispatched_users = _aggregate_dispatched_users(dispatch_users) + for user_class in [u for u in user_classes if not u.fixed_count]: + target_relative_weight = user_class.weight / sum( + map(attrgetter("weight"), [u for u in user_classes if not u.fixed_count]) + ) + relative_weight = aggregated_dispatched_users[user_class.__name__] / _user_count(dispatch_users) + error_percent = 100 * (relative_weight - target_relative_weight) / target_relative_weight + tol = 15 + self.assertLessEqual( + error_percent, + tol, + "Distribution for user class {} is off by more than {}% when user count is {}".format( + user_class, tol, _user_count(dispatch_users) + ), + ) + class TestSmallConsecutiveRamping(unittest.TestCase): def test_consecutive_ramp_up_and_ramp_down(self): @@ -3298,11 +3129,16 @@ def __init__(self, fixed_counts: Tuple[int], weights: Tuple[int], target_user_co self.weights = weights self.target_user_count = target_user_count + def __str__(self): + return "".format( + self.fixed_counts, self.weights, self.target_user_count + ) + def case_handler(self, cases: List[RampUpCase], expected: Dict[str, int], user_classes: List[User]): self.assertEqual(len(cases), len(expected)) for case_num in range(len(cases)): - # Reset to defaul values + # Reset to default values for user_class in user_classes: user_class.weight, user_class.fixed_count = 1, 0 @@ -3329,7 +3165,7 @@ def case_handler(self, cases: List[RampUpCase], expected: Dict[str, int], user_c users_dispatcher._wait_between_dispatch = 0 iterations = list(users_dispatcher) - self.assertDictEqual(iterations[-1]["1"], expected[case_num]) + self.assertDictEqual(iterations[-1]["1"], expected[case_num], msg=f"Wrong case {case}") def test_ramp_up_2_weigted_user_with_1_fixed_user(self): class User1(User): @@ -3437,7 +3273,32 @@ class User5(User): user_classes=[User1, User2, User3, User4, User5], ) - def test_ramp_up_ramp_down_and_rump_up_again_fixed(self): + def test_ramp_up_partially_ramp_down_and_rump_up_to_target(self): + class User1(User): + fixed_count = 50 + + class User2(User): + fixed_count = 50 + + target_count = User1.fixed_count + User2.fixed_count + + users_dispatcher = UsersDispatcher(worker_nodes=[WorkerNode("1")], user_classes=[User1, User2]) + users_dispatcher.new_dispatch(target_user_count=30, spawn_rate=0.5) + users_dispatcher._wait_between_dispatch = 0 + iterations = list(users_dispatcher) + self.assertDictEqual(iterations[-1]["1"], {"User1": 15, "User2": 15}) + + users_dispatcher.new_dispatch(target_user_count=20, spawn_rate=0.5) + users_dispatcher._wait_between_dispatch = 0 + iterations = list(users_dispatcher) + self.assertDictEqual(iterations[-1]["1"], {"User1": 10, "User2": 10}) + + users_dispatcher.new_dispatch(target_user_count=target_count, spawn_rate=0.5) + users_dispatcher._wait_between_dispatch = 0 + iterations = list(users_dispatcher) + self.assertDictEqual(iterations[-1]["1"], {"User1": 50, "User2": 50}) + + def test_ramp_up_ramp_down_and_rump_up_again(self): for weights, fixed_counts in [ [(1, 1, 1, 1, 1), (100, 100, 50, 50, 200)], [(1, 1, 1, 1, 1), (100, 150, 50, 50, 0)], diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index d0550bb9b9..483437251d 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1033,6 +1033,102 @@ def tick(self): self.assertEqual("stopped", master.state) + def test_distributed_shape_with_fixed_users(self): + """ + Full integration test that starts both a MasterRunner and three WorkerRunner instances + and tests a basic LoadTestShape with scaling up and down users with 'fixed count' users + """ + + class TestUser(User): + @task + def my_task(self): + pass + + class FixedUser1(User): + fixed_count = 1 + + @task + def my_task(self): + pass + + class FixedUser2(User): + fixed_count = 11 + + @task + def my_task(self): + pass + + class TestShape(LoadTestShape): + def tick(self): + run_time = self.get_run_time() + if run_time < 1: + return 12, 12 + elif run_time < 2: + return 36, 24 + elif run_time < 3: + return 12, 24 + else: + return None + + with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3): + test_shape = TestShape() + master_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2], shape_class=test_shape) + master_env.shape_class.reset_time() + master = master_env.create_master_runner("*", 0) + + workers = [] + for _ in range(3): + worker_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2]) + worker = worker_env.create_worker_runner("127.0.0.1", master.server.port) + workers.append(worker) + + # Give workers time to connect + sleep(0.1) + + # Start a shape test + master.start_shape() + sleep(1) + + # Ensure workers have connected and started the correct amount of users (fixed is spawn first) + for worker in workers: + self.assertEqual(4, worker.user_count, "Shape test has not reached stage 1") + self.assertEqual( + 12, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly" + ) + self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0}) + + # Ensure new stage with more users has been reached + sleep(1) + for worker in workers: + self.assertEqual(12, worker.user_count, "Shape test has not reached stage 2") + self.assertEqual( + 36, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly" + ) + self.assertDictEqual( + master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 24} + ) + + # Ensure new stage with less users has been reached + # and expected count of the fixed users is present + sleep(1) + for worker in workers: + self.assertEqual(4, worker.user_count, "Shape test has not reached stage 3") + self.assertEqual( + 12, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly" + ) + self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0}) + + # Ensure test stops at the end + sleep(0.5) + for worker in workers: + self.assertEqual(0, worker.user_count, "Shape test has not stopped") + self.assertEqual( + 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly" + ) + self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 0, "FixedUser2": 0, "TestUser": 0}) + + self.assertEqual(STATE_STOPPED, master.state) + def test_distributed_shape_with_stop_timeout(self): """ Full integration test that starts both a MasterRunner and five WorkerRunner instances