forked from SvenskaSpel/locust-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwait_time.py
66 lines (52 loc) · 2.5 KB
/
wait_time.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import logging
import time
from collections import deque, namedtuple
from locust import events, runners
from locust import constant_pacing
from typing import Any
_last_run = 0.0
_warning_emitted = False
_target_missed = False
_ips_window = deque() # an implicitly sorted list of when iterations where started
IPS_WINDOW_SIZE = 20
@events.quitting.add_listener
def quitting(**_kwargs: Any):
if _warning_emitted:
print(
"Failed to reach targeted number of iterations per second (at some point during the test). Probably caused by target system overload or too few clients"
)
# This wait time function to calculate when the next iteration should be run in order to achieve the desired number of Iterations Per Second for the whole locust instance
def constant_total_ips(ips: float):
def func(user):
global _warning_emitted, _target_missed, _last_run
runner = user.environment.runner
if runner is None: # happens when debugging a single locust
runner = namedtuple("FakeRunner", ["target_user_count", "state"])(1, runners.STATE_RUNNING)
delay = runner.target_user_count / ips
current_time = time.monotonic()
# As the delay calculation will not always be able to compensate (particularly when iteration times differ a lot),
# we compensate up to 10% for the last IPS_WINDOW_SIZE seconds worth of throughput. This will allow us to temporarily
# over/undershoot the target rate, but get the right rate over time.
while _ips_window and _ips_window[0] < current_time - IPS_WINDOW_SIZE:
_ips_window.popleft()
previous_rate = len(_ips_window) / IPS_WINDOW_SIZE
_ips_window.append(current_time)
rate_diff = previous_rate - ips
delay = delay * (1 + max(min(rate_diff, 0.1), -0.1) / ips)
next_time = _last_run + delay
if current_time > next_time:
if runner.state == runners.STATE_RUNNING and _target_missed and not _warning_emitted:
logging.warning("Failed to reach target ips, even after rampup has finished")
_warning_emitted = True # stop logging
_target_missed = True
_last_run = current_time
return 0
_target_missed = False
_last_run = next_time
return delay
return func
# inverted versions of common functions
def constant_ips(ips: float):
return constant_pacing(1.0 / ips)
def constant_total_pacing(seconds: float):
return constant_total_ips(1.0 / seconds)