Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move User selection responsibility from worker to master in order to fix unbalanced distribution of users and uneven ramp-up #1621

Merged
merged 149 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 147 commits
Commits
Show all changes
149 commits
Select commit Hold shift + click to select a range
f4f8711
Use random.choices in locust.runners.Runner.weight_users instead of c…
mboutet Nov 8, 2020
5ec8090
wip: users distribution & dispatch algorithm
mboutet Nov 13, 2020
394e024
Code cleanup
mboutet Nov 13, 2020
7d54784
Code cleanup
mboutet Nov 13, 2020
408f3ed
Code cleanup/refactoring
mboutet Nov 13, 2020
3b8167d
Update dispatch logic
mboutet Nov 16, 2020
5ecf8b1
Cleanup some imports
mboutet Nov 16, 2020
41a242d
type hint compatible with python 3.6
mboutet Nov 16, 2020
01e8b7c
Fix sleep in dispatch logic
mboutet Nov 16, 2020
d9bb87f
Refactoring for handling new users distribution and dispatch logic
mboutet Nov 16, 2020
c5799df
Add more tests + code cleanup
mboutet Nov 16, 2020
41f6ac2
Update signature of on_spawning_complete to take user_class_occurrenc…
mboutet Nov 16, 2020
3d41b58
Bugfix
mboutet Nov 16, 2020
5d5f732
Bugfix + fix typo
mboutet Nov 16, 2020
7450c9f
Fix failing test
mboutet Nov 16, 2020
bdce0a3
Fix failing test
mboutet Nov 16, 2020
5cad9a4
Update input event to work with new users distribution logic
mboutet Nov 17, 2020
dc18e77
Run Black
mboutet Nov 17, 2020
8804748
Set master state to "spawning" before waiting for dispatch greenlets
mboutet Nov 17, 2020
a1583e7
Add distributed shape integration test (with stop_timeout)
mboutet Nov 17, 2020
2724a32
Simplify type hint for _create_runner method
mboutet Nov 17, 2020
cc19073
Make input_listener more flexible
mboutet Nov 17, 2020
6810494
Update state before dispatching + fix flaky test
mboutet Nov 17, 2020
67a1dd2
Ensure key presses are unique across both dictionaries
mboutet Nov 17, 2020
62953c1
Rephrase comment
mboutet Nov 17, 2020
00eee48
Add doc strings
mboutet Nov 17, 2020
da310d5
Fix typo
mboutet Nov 17, 2020
82f4d91
Simplify users distribution algorithm (recursion is no longer used)
mboutet Nov 18, 2020
eb14c9b
Further simplify users distribution code
mboutet Nov 18, 2020
f823138
Remove unused function
mboutet Nov 18, 2020
8750bf9
Restore former input_listener signature & logic
mboutet Nov 18, 2020
e9c8fae
Add comments and doctrings
mboutet Nov 18, 2020
b4386b9
Remove unused imports
mboutet Nov 18, 2020
0b1ae30
Fix test
mboutet Nov 18, 2020
34755fc
Remove unused code
mboutet Nov 18, 2020
0fd881b
Relax pass criteria
mboutet Nov 18, 2020
04485c7
Remove unused condition in elif
mboutet Nov 18, 2020
b593a61
Remove duplicated test
mboutet Nov 18, 2020
15730f0
Combine two tests into a single test
mboutet Nov 18, 2020
3381be8
Rephrasing some comments + add type hints
mboutet Nov 18, 2020
8f84ddc
Bugfix when running in local runner
mboutet Nov 18, 2020
72d6b5d
Handle self.shape_greenlet = None
mboutet Nov 26, 2020
52e39ff
Sort workers by id
mboutet Dec 1, 2020
80dee44
Improve logic to balance users across workers so that each worker get…
mboutet Dec 9, 2020
0b82be4
Revert on_spawning_complete to previous signature
mboutet Feb 10, 2021
3232b36
Run Black on codebase
mboutet Feb 10, 2021
5972e2b
Remove duplicate imports
mboutet Mar 2, 2021
b02fa48
Adapt changes of ec1d27fa7c523da151f9e9aafcd4a64468adae90
mboutet Mar 3, 2021
6553423
Merge branch 'master' into users-distribution
mboutet Mar 3, 2021
327541f
Increase sleep time to reduce test flakyness
mboutet Mar 3, 2021
47590f0
Handle keyboard interrupt while in a gevent.sleep in dispatch_users
mboutet Mar 3, 2021
34e6338
Increase sleep time to reduce test flakyness
mboutet Mar 4, 2021
116fbdc
Refactor test to hopefully reduce flakyness
mboutet Mar 4, 2021
ea52674
Add TODO to handle KeyboardInterrupt when waiting for workers connect
mboutet Mar 7, 2021
2a28a7e
Make sure stats printer run when using headless mode
mboutet Mar 7, 2021
4d68f14
Add log indicating that spawning is complete after a ramp up
mboutet Mar 7, 2021
61399bd
Rework logging to be less spammy when level is INFO
mboutet Mar 7, 2021
e1c1c4d
Merge branch 'master' into users-distribution
mboutet Mar 12, 2021
6bb4b41
Ensure user class are balanced during ramp-up
mboutet Mar 15, 2021
8afbd15
Relocate duplicate code block in function
mboutet Mar 15, 2021
2833b88
Handle user_greenlet.args[0] IndexError
mboutet Mar 16, 2021
078bd05
Merge branch 'master' into users-distribution
mboutet May 3, 2021
d4171a0
Merge branch 'master' into users-distribution
mboutet May 4, 2021
abd183a
Fix typo in comments
mboutet May 4, 2021
162e87a
Also include "user_count" in message payload
mboutet May 4, 2021
f9d0f96
Merge branch 'master' into users-distribution
mboutet May 13, 2021
7f01a4d
Merge branch 'master' into users-distribution
mboutet May 28, 2021
45dbffc
Fix performance issue
mboutet May 31, 2021
487873b
Add another assertion in distribution logic
mboutet May 31, 2021
fc4dc02
Use time.perf_counter() as time.perf_counter_ns() is not in py3.6
mboutet Jun 2, 2021
9656102
Introduce WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP
mboutet Jun 2, 2021
e6288c5
Improve assertions in dispatch logic to handle lots of workers
mboutet Jun 3, 2021
f5fed17
Add debug log to show users during ramp-up
mboutet Jun 3, 2021
231efc3
Make TestDispatchUsersToWorkersFuzzy deterministic
mboutet Jun 3, 2021
e83b166
Ensure that the distribution is respected during a ramp-up
mboutet Jun 3, 2021
ed4b3b3
Use time.perf_counter() instead of time.time() in dispatch
mboutet Jun 4, 2021
cc4a208
Optimize hotspots in dispatch.py using line_profiler
mboutet Jun 4, 2021
d798796
Use fractions instead of percentages
mboutet Jun 9, 2021
71a1aa4
Remove `get_` prefix in function name
mboutet Jun 9, 2021
81f6d66
Ensure that the exception objects are not mutated when generated report
mboutet Jun 9, 2021
10ed15b
Fix bug & improve logic in dispatcher
mboutet Jun 11, 2021
331b204
Merge remote-tracking branch 'locustio/master' into users-distribution
mboutet Jun 11, 2021
a9b581c
Don't copy user_class_occurrences as it is copied inside the function
mboutet Jun 12, 2021
75565d3
Apply formatting, review wording in comments, rename variables
mboutet Jun 12, 2021
1c7e30c
Use `attrgetter` instead of `lambda` for efficiency
mboutet Jun 12, 2021
fa2eaa9
Add comments and refactor to make code clearer (hopefully)
mboutet Jun 12, 2021
334b5b6
Rename `user_class_occurrences` to `user_classes_count`
mboutet Jun 21, 2021
9a82cf2
Rename "dummy worker node" to "local worker node"
mboutet Jun 21, 2021
73bc6c1
Rename `go_to_next_user_class` to be more meaningful
mboutet Jun 21, 2021
3a34fe6
Use `user_count` instead of `number_of_users` for consistency
mboutet Jun 21, 2021
3786edd
Relocate comments on implementation details close to code
mboutet Jun 21, 2021
da0623e
Add a comment justifying the use of `.copy()`
mboutet Jun 21, 2021
314e2db
Rename `x_2_y` variables to something more natural
mboutet Jun 21, 2021
b5ab273
Revert improvements for default argument value being mutable
mboutet Jun 21, 2021
3239f21
Do not use intermediate and useless `users_dispatcher` variable
mboutet Jun 21, 2021
6ac0fa8
Fix wording in comment
mboutet Jun 21, 2021
f3db6e1
bis: Do not use intermediate and useless `users_dispatcher` variable
mboutet Jun 21, 2021
121407f
Remove slow and redundant tests in `test_dispatch.py`
mboutet Jun 21, 2021
c168529
Make dict comprehension more readable
mboutet Jun 21, 2021
0b6c7e0
Raise exception if multiple user classes have the same name
mboutet Jun 22, 2021
f91539d
Minor fix in test: `WorkerNode` takes a string as the name, not an int
mboutet Jun 22, 2021
496df0a
Refactor dispatch to use a single `UsersDispatcher` class
mboutet Jun 22, 2021
54ac967
Apply black with `--skip-magic-trailing-comma` on distribution code
mboutet Jun 22, 2021
3323816
Remove `_user_count_left_to_dispatch` getter
mboutet Jun 22, 2021
f8b4809
Rename `dispatch_generator` to reduce confusion
mboutet Jun 22, 2021
eacd1e8
Get rid of `_DistancesFromIdealDistribution` named tuple
mboutet Jun 22, 2021
10004da
Sort user classes once
mboutet Jun 22, 2021
539d44e
Simplify code by removing if conditional
mboutet Jun 22, 2021
a86698e
Refactor naming of some methods and variables in dispatch module
mboutet Jun 22, 2021
031e529
Add docstring and small refactoring for dispatch code
mboutet Jun 23, 2021
7fed917
Remove `_assert_computation_duration_of_dispatch_is_reasonable`
mboutet Jun 23, 2021
182918d
Remove unused variable
mboutet Jun 23, 2021
07e727e
Merge remote-tracking branch 'locustio/master' into users-distribution
mboutet Jun 23, 2021
97c046e
Merge remote-tracking branch 'locustio/master' into users-distribution
mboutet Jun 23, 2021
2b306b2
Fix indentation + remove space on empty line
mboutet Jun 23, 2021
1df84d6
Cast worker node name to string in test
mboutet Jun 23, 2021
611bdc0
Ensure master's state is `spawning` during ramp-up
mboutet Jun 24, 2021
53e9861
[draft] Ensure best-effort round-robin dispatch of users to workers
mboutet Jun 25, 2021
89d8b74
Prepend `LOCUST_` to environment variables
mboutet Jun 25, 2021
eb3c9ac
Remove commented code which is no longer in use
mboutet Jun 25, 2021
49fbeb1
Implement `LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP`
mboutet Jun 25, 2021
e9c577e
Use a named variable instead of hard-coding a constant in test
mboutet Jun 25, 2021
7aa1193
Fix `_patch_env` helper not correctly tearing down itself
mboutet Jun 25, 2021
1efb253
Clear all `functools.lru_cache` between tests
mboutet Jun 25, 2021
9651768
Improve dispatch for some corner cases + code cleanup
mboutet Jun 25, 2021
110669e
Reduce test flakyness by waiting for desired result
mboutet Jun 25, 2021
32906d4
Add comments to explain complex logic in dispatch
mboutet Jun 25, 2021
8fd81f0
Swallow warnings in `clear_all_functools_lru_cache` helper
mboutet Jun 25, 2021
eb8c396
Delete uncovered code in dispatch
mboutet Jun 25, 2021
8f8dd7e
Merge remote-tracking branch 'locustio/master' into users-distribution
mboutet Jun 27, 2021
cb0121c
[draft] Fix additional corner cases in dispatcher
mboutet Jun 27, 2021
a5569c5
Fix corner cases in dispatch logic causing infinite loops
mboutet Jun 28, 2021
6a41004
Print master's state if assertion fails
mboutet Jun 28, 2021
cf129ca
Ensure that `/swarm` endpoint is not blocking
mboutet Jun 28, 2021
c9acba0
Attempt at reducing test flakyness
mboutet Jun 29, 2021
0d9a238
Attempt at reducing test flakyness
mboutet Jun 29, 2021
3e4f09d
Set dispatcher tolerance to 25ms
mboutet Jun 29, 2021
7ca69b8
Use perf counter in dispatch tests
mboutet Jun 29, 2021
7cad475
Use free random port for zrpc test
mboutet Jun 29, 2021
1676436
Do not use `get_free_tcp_port` for zmqrpc test
mboutet Jun 29, 2021
cb585b4
Fix outdated docstring
mboutet Jul 1, 2021
5379f9e
Add TODO regarding ill-defined load test shape
mboutet Jul 2, 2021
a96f403
Add TODO regarding args missing from greenlet
mboutet Jul 2, 2021
04ff90f
Make sure `host` is set on the user_class of the workers
mboutet Jul 2, 2021
2cf679f
Refactor logging to be less verbose when level is INFO
mboutet Jul 2, 2021
d749485
Reinstate the previous warning regarding the workers' spawn rate
mboutet Jul 4, 2021
11a7825
Use `Ramping to %d users using a %.2f spawn rate` as the log msg
mboutet Jul 4, 2021
c9eb174
Increase warning threshold for total spawn rate
mboutet Jul 4, 2021
7d67e23
Remove unused import in html.py
mboutet Jul 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
596 changes: 596 additions & 0 deletions locust/dispatch.py

Large diffs are not rendered by default.

115 changes: 115 additions & 0 deletions locust/distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import math
from itertools import combinations_with_replacement
from operator import attrgetter
from typing import Dict, List, Type

from locust import User


def weight_users(user_classes: List[Type[User]], user_count: int) -> Dict[str, int]:
"""
Compute the desired state of users using the weight of each user class.

:param user_classes: the list of user class
:param user_count: total number of users
:return: the set of users to run
"""
assert user_count >= 0

if len(user_classes) == 0:
return {}

user_classes = sorted(user_classes, key=attrgetter("__name__"))

user_classes_count = {user_class.__name__: 0 for user_class in user_classes}

# If the number of users is less than the number of user classes, at most one user of each user class
# is chosen. User classes with higher weight are chosen first.
if user_count <= len(user_classes):
user_classes_count.update(
{
user_class.__name__: 1
for user_class in sorted(user_classes, key=attrgetter("weight"), reverse=True)[:user_count]
}
)
return user_classes_count

mboutet marked this conversation as resolved.
Show resolved Hide resolved
# If the number of users is greater than or equal to the number of user classes, at least one user of each
# user class will be chosen. The greater number of users is, the better the actual distribution
# of users will match the desired one (as dictated by the weight attributes).
weights = list(map(attrgetter("weight"), user_classes))
relative_weights = [weight / sum(weights) for weight in weights]
user_classes_count = {
user_class.__name__: round(relative_weight * user_count) or 1
for user_class, relative_weight in zip(user_classes, relative_weights)
}

if sum(user_classes_count.values()) == user_count:
return user_classes_count

else:
user_classes_count = _find_ideal_users_to_add_or_remove(
user_classes, user_count - sum(user_classes_count.values()), user_classes_count
)
assert sum(user_classes_count.values()) == user_count
return user_classes_count


def _find_ideal_users_to_add_or_remove(
user_classes: List[Type[User]], user_count_to_add_or_remove: int, user_classes_count: Dict[str, int]
) -> Dict[str, int]:
sign = -1 if user_count_to_add_or_remove < 0 else 1

user_count_to_add_or_remove = abs(user_count_to_add_or_remove)

assert user_count_to_add_or_remove <= len(user_classes), user_count_to_add_or_remove

# Formula for combination with replacement
# (https://www.tutorialspoint.com/statistics/combination_with_replacement.htm)
number_of_combinations = math.factorial(len(user_classes) + user_count_to_add_or_remove - 1) / (
math.factorial(user_count_to_add_or_remove) * math.factorial(len(user_classes) - 1)
)

# If the number of combinations with replacement is above this threshold, we simply add/remove
# users for the first "number_of_users_to_add_or_remove" users. Otherwise, computing the best
# distribution is too expensive in terms of computation.
max_number_of_combinations_threshold = 1000

if number_of_combinations <= max_number_of_combinations_threshold:
user_classes_count_candidates: Dict[float, Dict[str, int]] = {}
for user_classes_combination in combinations_with_replacement(user_classes, user_count_to_add_or_remove):
# Copy in order to not mutate `user_classes_count` for the parent scope
user_classes_count_candidate = user_classes_count.copy()
for user_class in user_classes_combination:
user_classes_count_candidate[user_class.__name__] += sign
distance = distance_from_desired_distribution(user_classes, user_classes_count_candidate)
if distance not in user_classes_count_candidates:
user_classes_count_candidates[distance] = user_classes_count_candidate.copy()

return user_classes_count_candidates[min(user_classes_count_candidates.keys())]

else:
# Copy in order to not mutate `user_classes_count` for the parent scope
user_classes_count_candidate = user_classes_count.copy()
for user_class in user_classes[:user_count_to_add_or_remove]:
user_classes_count_candidate[user_class.__name__] += sign
return user_classes_count_candidate


def distance_from_desired_distribution(user_classes: List[Type[User]], user_classes_count: Dict[str, int]) -> float:
actual_ratio_of_user_class = {
user_class: user_class_count / sum(user_classes_count.values())
for user_class, user_class_count in user_classes_count.items()
}

expected_ratio_of_user_class = {
user_class.__name__: user_class.weight / sum(map(attrgetter("weight"), user_classes))
for user_class in user_classes
}

differences = [
actual_ratio_of_user_class[user_class] - expected_ratio
for user_class, expected_ratio in expected_ratio_of_user_class.items()
]

return math.sqrt(math.fsum(map(lambda x: x ** 2, differences)))
41 changes: 34 additions & 7 deletions locust/env.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from operator import methodcaller
from typing import (
Dict,
List,
Type,
TypeVar,
)

from .event import Events
from .exception import RunnerAlreadyExistsError
from .stats import RequestStats
Expand All @@ -6,7 +14,9 @@
from .user import User
from .user.task import filter_tasks_by_tags
from .shape import LoadTestShape
from typing import List


RunnerType = TypeVar("RunnerType", bound=Runner)


class Environment:
Expand All @@ -16,7 +26,7 @@ class Environment:
See :ref:`events` for available events.
"""

user_classes: List[User] = []
user_classes: List[Type[User]] = []
"""User classes that the runner will run"""

shape_class: LoadTestShape = None
Expand Down Expand Up @@ -95,24 +105,37 @@ def __init__(

self._filter_tasks_by_tags()

def _create_runner(self, runner_class, *args, **kwargs):
# Validate there's no class with the same name but in different modules
if len(set(user_class.__name__ for user_class in self.user_classes)) != len(self.user_classes):
raise ValueError(
"The following user classes have the same class name: {}".format(
", ".join(map(methodcaller("fullname"), self.user_classes))
)
)

def _create_runner(
self,
runner_class: Type[RunnerType],
*args,
**kwargs,
) -> RunnerType:
if self.runner is not None:
raise RunnerAlreadyExistsError("Environment.runner already exists (%s)" % self.runner)
self.runner = runner_class(self, *args, **kwargs)
self.runner: RunnerType = runner_class(self, *args, **kwargs)

# Attach the runner to the shape class so that the shape class can access user count state
if self.shape_class:
self.shape_class.runner = self.runner

return self.runner

def create_local_runner(self):
def create_local_runner(self) -> LocalRunner:
"""
Create a :class:`LocalRunner <locust.runners.LocalRunner>` instance for this Environment
"""
return self._create_runner(LocalRunner)

def create_master_runner(self, master_bind_host="*", master_bind_port=5557):
def create_master_runner(self, master_bind_host="*", master_bind_port=5557) -> MasterRunner:
"""
Create a :class:`MasterRunner <locust.runners.MasterRunner>` instance for this Environment

Expand All @@ -126,7 +149,7 @@ def create_master_runner(self, master_bind_host="*", master_bind_port=5557):
master_bind_port=master_bind_port,
)

def create_worker_runner(self, master_host, master_port):
def create_worker_runner(self, master_host, master_port) -> WorkerRunner:
"""
Create a :class:`WorkerRunner <locust.runners.WorkerRunner>` instance for this Environment

Expand Down Expand Up @@ -191,3 +214,7 @@ def _filter_tasks_by_tags(self):

for user_class in self.user_classes:
filter_tasks_by_tags(user_class, self.tags, self.exclude_tags)

@property
def user_classes_by_name(self) -> Dict[str, Type[User]]:
return {u.__name__: u for u in self.user_classes}
2 changes: 2 additions & 0 deletions locust/html.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from copy import deepcopy

from jinja2 import Environment, FileSystemLoader
import os
import pathlib
Expand Down
4 changes: 3 additions & 1 deletion locust/input_events.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict

import gevent
import logging
import os
Expand Down Expand Up @@ -86,7 +88,7 @@ def get_poller():
return UnixKeyPoller()


def input_listener(key_to_func_map):
def input_listener(key_to_func_map: Dict[str, callable]):
def input_listener_func():
try:
poller = get_poller()
Expand Down
19 changes: 13 additions & 6 deletions locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def main():
web_ui.start()
main_greenlet = web_ui.greenlet

headless_master_greenlet = None
if options.headless:
# headless mode
if options.master:
Expand All @@ -316,6 +317,9 @@ def main():
len(runner.clients.ready),
options.expect_workers,
)
# TODO: Handle KeyboardInterrupt and send quit signal to workers that are started.
mboutet marked this conversation as resolved.
Show resolved Hide resolved
# Right now, if the user sends a ctrl+c, the master will not gracefully
# shutdown resulting in all the already started workers to stay active.
time.sleep(1)
if not options.worker:
# apply headless mode defaults
Expand All @@ -328,7 +332,8 @@ def main():
if environment.shape_class:
environment.runner.start_shape()
else:
runner.start(options.num_users, options.spawn_rate)
headless_master_greenlet = gevent.spawn(runner.start, options.num_users, options.spawn_rate)
headless_master_greenlet.link_exception(greenlet_exception_handler)

def spawn_run_time_limit_greenlet():
def timelimit_stop():
Expand All @@ -351,19 +356,19 @@ def timelimit_stop():
input_listener_greenlet = gevent.spawn(
input_listener(
{
"w": lambda: runner.spawn_users(1, 100)
"w": lambda: runner.start(runner.user_count + 1, 100)
if runner.state != "spawning"
else logging.warning("Already spawning users, can't spawn more right now"),
"W": lambda: runner.spawn_users(10, 100)
"W": lambda: runner.start(runner.user_count + 10, 100)
if runner.state != "spawning"
else logging.warning("Already spawning users, can't spawn more right now"),
"s": lambda: runner.stop_users(1)
"s": lambda: runner.start(max(0, runner.user_count - 1), 100)
if runner.state != "spawning"
else logging.warning("Spawning users, can't stop right now"),
"S": lambda: runner.stop_users(10)
"S": lambda: runner.start(max(0, runner.user_count - 10), 100)
if runner.state != "spawning"
else logging.warning("Spawning users, can't stop right now"),
}
},
)
)
input_listener_greenlet.link_exception(greenlet_exception_handler)
Expand Down Expand Up @@ -403,6 +408,8 @@ def shutdown():
logger.info("Shutting down (exit code %s), bye." % code)
if stats_printer_greenlet is not None:
stats_printer_greenlet.kill(block=False)
if headless_master_greenlet is not None:
headless_master_greenlet.kill(block=False)
logger.info("Cleaning up runner...")
if runner is not None:
runner.quit()
Expand Down
Loading