Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging messages and clean up code after dispatch refactoring (#1809) #1826

Merged
merged 10 commits into from
Jul 28, 2021
7 changes: 7 additions & 0 deletions benchmarks/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
"""
This file contains a benchmark to validate the performance of Locust itself.
More precisely, the performance of the `UsersDispatcher` class which is responsible
for calculating the distribution of users on each workers. This benchmark is to be used
by people working on Locust's development.
"""

import itertools
import statistics
import time
Expand Down
75 changes: 62 additions & 13 deletions locust/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]:

while self._current_user_count < self._target_user_count:
with self._wait_between_dispatch_iteration_context():
yield self._ramp_up()
yield self._add_users_on_workers()
if self._rebalance:
self._rebalance = False
yield self._users_on_workers

while self._current_user_count > self._target_user_count:
with self._wait_between_dispatch_iteration_context():
yield self._ramp_down()
yield self._remove_users_from_workers()
if self._rebalance:
self._rebalance = False
yield self._users_on_workers
Expand All @@ -144,6 +144,8 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]:

def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None:
"""
Initialize a new dispatch cycle.

:param target_user_count: The desired user count at the end of the dispatch cycle
:param spawn_rate: The spawn rate
"""
Expand All @@ -166,26 +168,47 @@ def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None:
self._dispatch_iteration_durations.clear()

def add_worker(self, worker_node: "WorkerNode") -> None:
"""
This method is to be called when a new worker connects to the master. When
a new worker is added, the users dispatcher will flag that a rebalance is required
and ensure that the next dispatch iteration will be made to redistribute the users
on the new pool of workers.

:param worker_node: The worker node to add.
"""
self._worker_nodes.append(worker_node)
self._worker_nodes = sorted(self._worker_nodes, key=lambda w: w.id)
self._prepare_rebalance()

def remove_worker(self, worker_node_id: str) -> None:
self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node_id]
def remove_worker(self, worker_node: "WorkerNode") -> None:
"""
This method is similar to the above `add_worker`. When a worker disconnects
(because of e.g. network failure, worker failure, etc.), this method will ensure that the next
dispatch iteration redistributes the users on the remaining workers.

:param worker_node: The worker node to remove.
"""
self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id]
if len(self._worker_nodes) == 0:
# TODO: Test this
return
self._prepare_rebalance()

def _prepare_rebalance(self) -> None:
"""
When a rebalance is required because of added and/or removed workers, we compute the desired state as if
we started from 0 user. So, if we were currently running 500 users, then the `_distribute_users` will
perform a fake ramp-up without any waiting and return the final distribution.
"""
users_on_workers, user_gen, worker_gen, active_users = self._distribute_users(self._current_user_count)

self._users_on_workers = users_on_workers
self._user_generator = user_gen
self._worker_node_generator = worker_gen
self._active_users = active_users

# TODO: What to do with self._initial_users_on_workers?
# It's important to reset the generators by using the ones from `_distribute_users`
# so that the next iterations are smooth and continuous.
self._user_generator = user_gen
self._worker_node_generator = worker_gen

self._rebalance = True

Expand All @@ -210,7 +233,11 @@ def _wait_between_dispatch_iteration_context(self) -> None:
sleep_duration = max(0.0, self._wait_between_dispatch - delta)
gevent.sleep(sleep_duration)

def _ramp_up(self) -> Dict[str, Dict[str, int]]:
def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]:
"""Add users on the workers until the target number of users is reached for the current dispatch iteration

:return: The users that we want to run on the workers
"""
current_user_count_target = min(
self._current_user_count + self._user_count_per_dispatch_iteration, self._target_user_count
)
Expand All @@ -222,7 +249,11 @@ def _ramp_up(self) -> Dict[str, Dict[str, int]]:
if self._current_user_count >= current_user_count_target:
return self._users_on_workers

def _ramp_down(self) -> Dict[str, Dict[str, int]]:
def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]:
"""Remove users from the workers until the target number of users is reached for the current dispatch iteration

:return: The users that we want to run on the workers
"""
current_user_count_target = max(
self._current_user_count - self._user_count_per_dispatch_iteration, self._target_user_count
)
Expand All @@ -236,7 +267,6 @@ def _ramp_down(self) -> Dict[str, Dict[str, int]]:
if self._current_user_count == 0 or self._current_user_count <= current_user_count_target:
return self._users_on_workers

# TODO: Test this
def _distribute_users(
self, target_user_count: int
) -> Tuple[dict, Generator[str, None, None], typing.Iterator["WorkerNode"], List[Tuple["WorkerNode", str]]]:
Expand Down Expand Up @@ -267,13 +297,32 @@ def _distribute_users(
return users_on_workers, user_gen, worker_gen, active_users

def _user_gen(self) -> Generator[str, None, None]:
# TODO: Explain why we do round(10 * user_class.weight / min_weight)
"""
This method generates users according to their weights using
a smooth weighted round-robin algorithm implemented by https://github.com/linnik/roundrobin.

For example, given users A, B with weights 5 and 1 respectively, this algorithm
will yield AAABAAAAABAA. The smooth aspect of this algorithm is what makes it possible
to keep the distribution during ramp-up and ramp-down. If we were to use a normal
weighted round-robin algorithm, we'd get AAAAABAAAAAB which would make the distribution
less accurate during ramp-up/down.
"""
# Normalize the weights so that the smallest weight will be equal to "target_min_weight".
# The value "2" was experimentally determined because it gave a better distribution especially
# when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc.
target_min_weight = 2
min_weight = min(u.weight for u in self._user_classes)
normalized_weights = [
(user_class.__name__, round(2 * user_class.weight / min_weight)) for user_class in self._user_classes
(user_class.__name__, round(target_min_weight * user_class.weight / min_weight))
for user_class in self._user_classes
]
gen = smooth(normalized_weights)
# TODO: Explain `generation_length_to_get_proper_distribution`
# Instead of calling `gen()` for each user, we cycle through a generator of fixed-length
# `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because
# we only ever need to call `gen()` a relatively small number of times. The length of this generator
# is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is
# 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over
# until the target user count is reached.
generation_length_to_get_proper_distribution = sum(
normalized_weight[1] for normalized_weight in normalized_weights
)
Expand Down
30 changes: 7 additions & 23 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def start(self, user_count: int, spawn_rate: float, wait: bool = False):
worker_nodes=[self._local_worker_node], user_classes=self.user_classes
)

logger.info("Ramping to %d users using a %.2f spawn rate" % (user_count, spawn_rate))
logger.info("Ramping to %d users at a rate of %.2f per second" % (user_count, spawn_rate))

self._users_dispatcher.new_dispatch(user_count, spawn_rate)

Expand Down Expand Up @@ -670,18 +670,6 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
"Your selected spawn rate is very high (>100/worker), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"
)

# Since https://github.com/locustio/locust/pull/1621, the master is responsible for dispatching and controlling
# the total spawn rate which is more CPU intensive for the master. The number 200 is a little arbitrary as the computational
# load on the master greatly depends on the number of workers and the number of user classes. For instance,
# 5 user classes and 5 workers can easily do 200/s. However, 200/s with 50 workers and 20 user classes will likely make the
# dispatch very slow because of the required computations. I (@mboutet) doubt that many Locust's users are
# spawning that rapidly. If so, then they'll likely open issues on GitHub in which case I'll (@mboutet) take a look.
if spawn_rate > 200:
logger.warning(
"Your selected total spawn rate is quite high (>200), and this is known to sometimes cause performance issues on the master. "
"Do you really need to ramp up that fast? If so and if encountering performance issues on the master, free to open an issue."
)

if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:
self.stats.clear_all()
self.exceptions = {}
Expand All @@ -694,8 +682,6 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
self._users_dispatcher.new_dispatch(target_user_count=user_count, spawn_rate=spawn_rate)

try:
dispatched_users = None

for dispatched_users in self._users_dispatcher:
dispatch_greenlets = Group()
for worker_node_id, worker_user_classes_count in dispatched_users.items():
Expand Down Expand Up @@ -724,7 +710,6 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
"Currently spawned users: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count)
)

assert dispatched_users is not None
self.target_user_classes_count = _aggregate_dispatched_users(dispatched_users)

except KeyboardInterrupt:
Expand Down Expand Up @@ -771,12 +756,9 @@ def _wait_for_workers_report_after_ramp_up(self) -> float:
if match is None:
assert float(locust_wait_for_workers_report_after_ramp_up) >= 0
return float(locust_wait_for_workers_report_after_ramp_up)

if match is not None:
else:
return float(match.group("coeff")) * WORKER_REPORT_INTERVAL

assert False, "not supposed to reach that"

def stop(self, send_stop_to_client: bool = True):
if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:
logger.debug("Stopping...")
Expand Down Expand Up @@ -836,7 +818,7 @@ def heartbeat_worker(self):
client.state = STATE_MISSING
client.user_classes_count = {}
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(client.id)
self._users_dispatcher.remove_worker(client)
# TODO: If status is `STATE_RUNNING`, call self.start()
if self.worker_count <= 0:
logger.info("The last worker went missing, stopping test.")
Expand Down Expand Up @@ -891,9 +873,10 @@ def client_listener(self):
# if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
elif msg.type == "client_stopped":
client = self.clients[msg.node_id]
del self.clients[msg.node_id]
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(msg.node_id)
self._users_dispatcher.remove_worker(client)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
Expand Down Expand Up @@ -932,9 +915,10 @@ def client_listener(self):
self.clients[msg.node_id].user_classes_count = msg.data["user_classes_count"]
elif msg.type == "quit":
if msg.node_id in self.clients:
client = self.clients[msg.node_id]
del self.clients[msg.node_id]
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(msg.node_id)
self._users_dispatcher.remove_worker(client)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
Expand Down
Loading