-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple Runner and Locust code by introducing Locust.start and Locust.stop methods #1306
Changes from 3 commits
4b84b10
fa91efa
a896a01
c15eead
27151cd
07d1ee6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,15 @@ | |
from .clients import HttpSession | ||
from .exception import (InterruptTaskSet, LocustError, RescheduleTask, | ||
RescheduleTaskImmediately, StopLocust, MissingWaitTimeError) | ||
from .runners import STATE_CLEANUP, LOCUST_STATE_RUNNING, LOCUST_STATE_STOPPING, LOCUST_STATE_WAITING | ||
from .util import deprecation | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"] | ||
|
||
|
||
def task(weight=1): | ||
""" | ||
Used as a convenience decorator to be able to declare tasks for a Locust or a TaskSet | ||
|
@@ -272,14 +274,9 @@ def run(self, *args, **kwargs): | |
self.schedule_task(self.get_next_task()) | ||
|
||
try: | ||
if self.locust._state == LOCUST_STATE_STOPPING: | ||
raise GreenletExit() | ||
self._check_stop_condition() | ||
self.execute_next_task() | ||
if self.locust._state == LOCUST_STATE_STOPPING: | ||
raise GreenletExit() | ||
except RescheduleTaskImmediately: | ||
if self.locust._state == LOCUST_STATE_STOPPING: | ||
raise GreenletExit() | ||
pass | ||
except RescheduleTask: | ||
self.wait() | ||
|
@@ -361,13 +358,19 @@ class Tasks(TaskSet): | |
)) | ||
|
||
def wait(self): | ||
self._check_stop_condition() | ||
self.locust._state = LOCUST_STATE_WAITING | ||
self._sleep(self.wait_time()) | ||
self._check_stop_condition() | ||
self.locust._state = LOCUST_STATE_RUNNING | ||
|
||
def _sleep(self, seconds): | ||
gevent.sleep(seconds) | ||
|
||
def _check_stop_condition(self): | ||
if self.locust._state == LOCUST_STATE_STOPPING: | ||
raise StopLocust() | ||
|
||
def interrupt(self, reschedule=True): | ||
""" | ||
Interrupt the TaskSet and hand over execution control back to the parent TaskSet. | ||
|
@@ -527,7 +530,8 @@ class ForumPage(TaskSet): | |
_setup_has_run = False # Internal state to see if we have already run | ||
_teardown_is_set = False # Internal state to see if we have already run | ||
_lock = gevent.lock.Semaphore() # Lock to make sure setup is only run once | ||
_state = False | ||
_state = None | ||
_greenlet = None | ||
|
||
def __init__(self, environment): | ||
super(Locust, self).__init__() | ||
|
@@ -556,21 +560,59 @@ def _set_setup_flag(cls): | |
def _set_teardown_flag(cls): | ||
cls._teardown_is_set = True | ||
|
||
def run(self, runner=None): | ||
def run(self): | ||
self._state = LOCUST_STATE_RUNNING | ||
task_set_instance = self._task_set(self) | ||
try: | ||
if hasattr(self, "on_start"): | ||
# run the task_set on_start method, if it has one | ||
self.on_start() | ||
task_set_instance.run() | ||
except StopLocust: | ||
pass | ||
except GreenletExit as e: | ||
if runner: | ||
runner.state = STATE_CLEANUP | ||
# Run the task_set on_stop method, if it has one | ||
if hasattr(task_set_instance, "on_stop"): | ||
task_set_instance.on_stop() | ||
raise # Maybe something relies on this except being raised? | ||
except (GreenletExit, StopLocust) as e: | ||
if hasattr(self, "on_stop"): | ||
# run the task_set on_stop method, if it has one | ||
self.on_stop() | ||
|
||
def start(self, gevent_group): | ||
""" | ||
Start a greenlet that runs this locust instance. | ||
|
||
*Arguments*: | ||
|
||
* gevent_group: gevent.pool.Group instance where the greenlet will be spawned. | ||
|
||
Returns the spawned greenlet. | ||
""" | ||
def run_locust(user): | ||
""" | ||
Main function gor Locust user greenlet. It's important that this function takes the locust | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gor? :P There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, will fix :) |
||
instance as argument, since we use greenlet_instance.args[0] to retrieve a reference to the | ||
locust instance. | ||
""" | ||
user.run() | ||
self._greenlet = gevent_group.spawn(run_locust, self) | ||
return self._greenlet | ||
|
||
def stop(self, gevent_group, force=False): | ||
""" | ||
Stop the locust user greenlet that exists in the gevent_group. | ||
This method is not meant to be called from within of the Locust's greenlet. | ||
|
||
*Arguments*: | ||
|
||
* gevent_group: gevent.pool.Group instance where the greenlet will be spawned. | ||
* force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING | ||
which will make the Locust instance stop once any currently running task is complete and on_stop | ||
methods are called. If force is True the greenlet will be killed immediately. | ||
|
||
Returns True if the greenlet was killed immediately, otherwise False | ||
""" | ||
if force or self._state == LOCUST_STATE_WAITING: | ||
gevent_group.killone(self._greenlet) | ||
return True | ||
elif self._state == LOCUST_STATE_RUNNING: | ||
self._state = LOCUST_STATE_STOPPING | ||
return False | ||
|
||
|
||
class HttpLocust(Locust): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,6 @@ | |
|
||
import gevent | ||
import psutil | ||
from gevent import GreenletExit | ||
from gevent.pool import Group | ||
|
||
from .rpc import Message, rpc | ||
|
@@ -24,7 +23,6 @@ | |
HEARTBEAT_INTERVAL = 1 | ||
HEARTBEAT_LIVENESS = 3 | ||
|
||
LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"] | ||
|
||
class LocustRunner(object): | ||
def __init__(self, environment, locust_classes): | ||
|
@@ -135,15 +133,10 @@ def hatch(): | |
self.environment.events.hatch_complete.fire(user_count=len(self.locusts)) | ||
return | ||
|
||
locust = bucket.pop(random.randint(0, len(bucket)-1)) | ||
occurrence_count[locust.__name__] += 1 | ||
new_locust = locust(self.environment) | ||
def start_locust(_): | ||
try: | ||
new_locust.run(runner=self) | ||
except GreenletExit: | ||
pass | ||
self.locusts.spawn(start_locust, new_locust) | ||
locust_class = bucket.pop(random.randint(0, len(bucket)-1)) | ||
occurrence_count[locust_class.__name__] += 1 | ||
new_locust = locust_class(self.environment) | ||
new_locust.start(self.locusts) | ||
if len(self.locusts) % 10 == 0: | ||
logger.debug("%i locusts hatched" % len(self.locusts)) | ||
if bucket: | ||
|
@@ -161,36 +154,32 @@ def kill_locusts(self, kill_count): | |
bucket = self.weight_locusts(kill_count) | ||
kill_count = len(bucket) | ||
logger.info("Killing %i locusts" % kill_count) | ||
dying = [] | ||
to_kill = [] | ||
for g in self.locusts: | ||
for l in bucket: | ||
if l == type(g.args[0]): | ||
dying.append(g) | ||
user = g.args[0] | ||
if l == type(user): | ||
to_kill.append(user) | ||
bucket.remove(l) | ||
break | ||
self.kill_locust_greenlets(dying) | ||
self.kill_locust_instances(to_kill) | ||
self.environment.events.hatch_complete.fire(user_count=self.user_count) | ||
|
||
def kill_locust_greenlets(self, greenlets): | ||
""" | ||
Kill running locust greenlets. If environment.stop_timeout is set, we try to stop the | ||
Locust users gracefully | ||
""" | ||
|
||
def kill_locust_instances(self, users): | ||
if self.environment.stop_timeout: | ||
dying = Group() | ||
for g in greenlets: | ||
locust = g.args[0] | ||
if locust._state == LOCUST_STATE_WAITING: | ||
self.locusts.killone(g) | ||
else: | ||
locust._state = LOCUST_STATE_STOPPING | ||
dying.add(g) | ||
for user in users: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you're using "user/s" instead of "locust/s" because we're renaming it? it is confusing when the comments talk about locusts but the code about users :P There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I figured we're renaming it, and even before we've done that I don't think |
||
if not user.stop(self.locusts, force=False): | ||
# Locust.stop() returns False if the greenlet was not killed, so we'll need | ||
# to add it's greenlet to our dying Group so we can wait for it to finish it's task | ||
dying.add(user._greenlet) | ||
if not dying.join(timeout=self.environment.stop_timeout): | ||
logger.info("Not all locusts finished their tasks & terminated in %s seconds. Killing them..." % self.environment.stop_timeout) | ||
dying.kill(block=True) | ||
else: | ||
for g in greenlets: | ||
self.locusts.killone(g) | ||
for user in users: | ||
user.stop(self.locusts, force=True) | ||
|
||
def monitor_cpu(self): | ||
process = psutil.Process() | ||
|
@@ -252,10 +241,11 @@ def stepload_worker(self, hatch_rate, step_clients_growth, step_duration): | |
gevent.sleep(step_duration) | ||
|
||
def stop(self): | ||
self.state = STATE_CLEANUP | ||
# if we are currently hatching locusts we need to kill the hatching greenlet first | ||
if self.hatching_greenlet and not self.hatching_greenlet.ready(): | ||
self.hatching_greenlet.kill(block=True) | ||
self.kill_locust_greenlets([g for g in self.locusts]) | ||
self.kill_locust_instances([g.args[0] for g in self.locusts]) | ||
self.state = STATE_STOPPED | ||
self.cpu_log_warning() | ||
self.environment.events.locust_stop_hatching.fire() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be possible to define an on_start/on_stop method on the TaskSet base class that does nothing? Instead of checking whether they exist...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that might be better. I guess that any performance difference should be negligible (haven't measure so I don't even know which one is fastest).