Skip to content

Commit

Permalink
Use worker_node in remove_worker for consistency
Browse files Browse the repository at this point in the history
I also added comments and docstrings to explain what's going on in some places.
  • Loading branch information
mboutet committed Jul 28, 2021
1 parent 9099184 commit 06b4208
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 15 deletions.
31 changes: 28 additions & 3 deletions locust/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]:

def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None:
"""
Initialize a new dispatch cycle.
:param target_user_count: The desired user count at the end of the dispatch cycle
:param spawn_rate: The spawn rate
"""
Expand All @@ -166,24 +168,47 @@ def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None:
self._dispatch_iteration_durations.clear()

def add_worker(self, worker_node: "WorkerNode") -> None:
"""
This method is to be called when a new worker connects to the master. When
a new worker is added, the users dispatcher will flag that a rebalance is required
and ensure that the next dispatch iteration will be made to redistribute the users
on the new pool of workers.
:param worker_node: The worker node to add.
"""
self._worker_nodes.append(worker_node)
self._worker_nodes = sorted(self._worker_nodes, key=lambda w: w.id)
self._prepare_rebalance()

def remove_worker(self, worker_node_id: str) -> None:
self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node_id]
def remove_worker(self, worker_node: "WorkerNode") -> None:
"""
This method is similar to the above `add_worker`. When a worker disconnects
(because of e.g. network failure, worker failure, etc.), this method will ensure that the next
dispatch iteration redistributes the users on the remaining workers.
:param worker_node: The worker node to remove.
"""
self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id]
if len(self._worker_nodes) == 0:
# TODO: Test this
return
self._prepare_rebalance()

def _prepare_rebalance(self) -> None:
"""
When a rebalance is required because of added and/or removed workers, we compute the desired state as if
we started from 0 user. So, if we were currently running 500 users, then the `_distribute_users` will
perform a fake ramp-up without any waiting and return the final distribution.
"""
users_on_workers, user_gen, worker_gen, active_users = self._distribute_users(self._current_user_count)

self._users_on_workers = users_on_workers
self._active_users = active_users

# It's important to reset the generators by using the ones from `_distribute_users`
# so that the next iterations are smooth and continuous.
self._user_generator = user_gen
self._worker_node_generator = worker_gen
self._active_users = active_users

self._rebalance = True

Expand Down
8 changes: 5 additions & 3 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ def heartbeat_worker(self):
client.state = STATE_MISSING
client.user_classes_count = {}
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(client.id)
self._users_dispatcher.remove_worker(client)
# TODO: If status is `STATE_RUNNING`, call self.start()
if self.worker_count <= 0:
logger.info("The last worker went missing, stopping test.")
Expand Down Expand Up @@ -873,9 +873,10 @@ def client_listener(self):
# if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
elif msg.type == "client_stopped":
client = self.clients[msg.node_id]
del self.clients[msg.node_id]
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(msg.node_id)
self._users_dispatcher.remove_worker(client)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
Expand Down Expand Up @@ -914,9 +915,10 @@ def client_listener(self):
self.clients[msg.node_id].user_classes_count = msg.data["user_classes_count"]
elif msg.type == "quit":
if msg.node_id in self.clients:
client = self.clients[msg.node_id]
del self.clients[msg.node_id]
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(msg.node_id)
self._users_dispatcher.remove_worker(client)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
Expand Down
18 changes: 9 additions & 9 deletions locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2385,7 +2385,7 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

users_dispatcher.remove_worker(worker_nodes[1].id)
users_dispatcher.remove_worker(worker_nodes[1])

# Re-balance
ts = time.perf_counter()
Expand Down Expand Up @@ -2447,8 +2447,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

users_dispatcher.remove_worker(worker_nodes[1].id)
users_dispatcher.remove_worker(worker_nodes[2].id)
users_dispatcher.remove_worker(worker_nodes[1])
users_dispatcher.remove_worker(worker_nodes[2])

# Re-balance
ts = time.perf_counter()
Expand Down Expand Up @@ -2489,7 +2489,7 @@ class User3(User):

list(users_dispatcher)

users_dispatcher.remove_worker(worker_nodes[1].id)
users_dispatcher.remove_worker(worker_nodes[1])

users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3)

Expand Down Expand Up @@ -2554,8 +2554,8 @@ class User3(User):

list(users_dispatcher)

users_dispatcher.remove_worker(worker_nodes[1].id)
users_dispatcher.remove_worker(worker_nodes[2].id)
users_dispatcher.remove_worker(worker_nodes[1])
users_dispatcher.remove_worker(worker_nodes[2])

users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=3)

Expand Down Expand Up @@ -2639,7 +2639,7 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

users_dispatcher.remove_worker(worker_nodes[1].id)
users_dispatcher.remove_worker(worker_nodes[1])

# Re-balance
ts = time.perf_counter()
Expand Down Expand Up @@ -2705,8 +2705,8 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 4)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

users_dispatcher.remove_worker(worker_nodes[1].id)
users_dispatcher.remove_worker(worker_nodes[2].id)
users_dispatcher.remove_worker(worker_nodes[1])
users_dispatcher.remove_worker(worker_nodes[2])

# Re-balance
ts = time.perf_counter()
Expand Down

0 comments on commit 06b4208

Please sign in to comment.