Skip to content

Commit

Permalink
Add test for removing last workers in users dispatcher
Browse files Browse the repository at this point in the history
Also add more assertions for `UsersDispatcher._rebalance` in dispatch tests.
  • Loading branch information
mboutet committed Jul 28, 2021
1 parent 06b4208 commit ff687c4
Showing 1 changed file with 115 additions and 0 deletions.
115 changes: 115 additions & 0 deletions locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2385,8 +2385,12 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -2398,6 +2402,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2447,9 +2453,13 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[1])
users_dispatcher.remove_worker(worker_nodes[2])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -2460,6 +2470,8 @@ class User3(User):
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2489,10 +2501,16 @@ class User3(User):

list(users_dispatcher)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3)

self.assertTrue(users_dispatcher._rebalance)

sleep_time = 1

# Re-balance
Expand All @@ -2506,6 +2524,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 5)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 1
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2554,11 +2574,17 @@ class User3(User):

list(users_dispatcher)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[1])
users_dispatcher.remove_worker(worker_nodes[2])

self.assertTrue(users_dispatcher._rebalance)

users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3)

self.assertTrue(users_dispatcher._rebalance)

sleep_time = 1

# Re-balance
Expand All @@ -2571,6 +2597,8 @@ class User3(User):
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 3, "User2": 3, "User3": 3})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 9)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 1
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2639,8 +2667,12 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -2652,6 +2684,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 6)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2705,9 +2739,13 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[1])
users_dispatcher.remove_worker(worker_nodes[2])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -2718,6 +2756,8 @@ class User3(User):
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 4, "User2": 4, "User3": 4})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 12)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -2726,6 +2766,41 @@ class User3(User):
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 3, "User2": 3, "User3": 3})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 9)

def test_remove_last_worker(self):
class User1(User):
weight = 1

class User2(User):
weight = 1

class User3(User):
weight = 1

user_classes = [User1, User2, User3]

worker_nodes = [WorkerNode(str(i + 1)) for i in range(1)]

users_dispatcher = UsersDispatcher(worker_nodes=worker_nodes, user_classes=user_classes)

users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=3)
users_dispatcher._wait_between_dispatch = 0

# Dispatch iteration 1
dispatched_users = next(users_dispatcher)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 1, "User2": 1, "User3": 1})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3)

# Dispatch iteration 2
dispatched_users = next(users_dispatcher)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[0])

self.assertFalse(users_dispatcher._rebalance)


class TestAddWorker(unittest.TestCase):
def test_add_worker_during_ramp_up(self):
Expand Down Expand Up @@ -2766,8 +2841,12 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.add_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -2780,6 +2859,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2826,9 +2907,13 @@ class User3(User):
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.add_worker(worker_nodes[1])
users_dispatcher.add_worker(worker_nodes[2])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -2841,6 +2926,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2872,10 +2959,16 @@ class User3(User):

list(users_dispatcher)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.add_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3)

self.assertTrue(users_dispatcher._rebalance)

sleep_time = 1

# Re-balance
Expand All @@ -2890,6 +2983,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 1
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -2941,11 +3036,17 @@ class User3(User):

list(users_dispatcher)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.add_worker(worker_nodes[1])
users_dispatcher.add_worker(worker_nodes[2])

self.assertTrue(users_dispatcher._rebalance)

users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3)

self.assertTrue(users_dispatcher._rebalance)

sleep_time = 1

# Re-balance
Expand All @@ -2960,6 +3061,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 1
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -3032,8 +3135,12 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 6)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 6)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.add_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -3046,6 +3153,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down Expand Up @@ -3096,9 +3205,13 @@ class User3(User):
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 4, "User2": 4, "User3": 4})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 12)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.add_worker(worker_nodes[1])
users_dispatcher.add_worker(worker_nodes[2])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand All @@ -3111,6 +3224,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
Expand Down

0 comments on commit ff687c4

Please sign in to comment.