Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option (--stop-timeout) to allow tasks to finish running their iteration before exiting #1099

Merged
merged 13 commits into from
Oct 26, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion locust/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .clients import HttpSession
from .exception import (InterruptTaskSet, LocustError, RescheduleTask,
RescheduleTaskImmediately, StopLocust)
from .runners import STATE_CLEANUP
from .runners import STATE_CLEANUP, LOCUST_STATE_RUNNING, LOCUST_STATE_STOPPING, LOCUST_STATE_WAITING
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -134,6 +134,7 @@ class Locust(object):
_setup_has_run = False # Internal state to see if we have already run
_teardown_is_set = False # Internal state to see if we have already run
_lock = gevent.lock.Semaphore() # Lock to make sure setup is only run once
_state = False

def __init__(self):
super(Locust, self).__init__()
Expand Down Expand Up @@ -369,6 +370,8 @@ def run(self, *args, **kwargs):
try:
self.execute_next_task()
except RescheduleTaskImmediately:
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
pass
except RescheduleTask:
self.wait()
Expand Down Expand Up @@ -432,7 +435,11 @@ def get_wait_secs(self):
return millis / 1000.0

def wait(self):
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
self.locust._state = LOCUST_STATE_WAITING
self._sleep(self.get_wait_secs())
self.locust._state = LOCUST_STATE_RUNNING

def _sleep(self, seconds):
gevent.sleep(seconds)
Expand Down
9 changes: 9 additions & 0 deletions locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ def parse_options():
help="sets the exit code to post on error"
)

parser.add_argument(
'-s', '--stop-timeout',
action='store',
type=int,
dest='stop_timeout',
default=None,
help="number of seconds to wait for a taskset to complete an iteration before exiting. default is to terminate immediately."
cyberw marked this conversation as resolved.
Show resolved Hide resolved
)

parser.add_argument(
'locust_classes',
nargs='*',
Expand Down
17 changes: 14 additions & 3 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPING, STATE_STOPPED, STATE_MISSING = ["ready", "hatching", "running", "cleanup", "stopping", "stopped", "missing"]
SLAVE_REPORT_INTERVAL = 3.0

LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]

class LocustRunner(object):
def __init__(self, locust_classes, options):
Expand Down Expand Up @@ -125,12 +126,13 @@ def hatch():

locust = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust.__name__] += 1
new_locust = locust()
def start_locust(_):
try:
locust().run(runner=self)
new_locust.run(runner=self)
except GreenletExit:
pass
new_locust = self.locusts.spawn(start_locust, locust)
self.locusts.spawn(start_locust, new_locust)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
gevent.sleep(sleep_time)
Expand All @@ -151,7 +153,7 @@ def kill_locusts(self, kill_count):
dying = []
for g in self.locusts:
for l in bucket:
if l == g.args[0]:
if l == type(g.args[0]):
dying.append(g)
bucket.remove(l)
break
Expand Down Expand Up @@ -193,6 +195,15 @@ def stop(self):
# if we are currently hatching locusts we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
if self.options.stop_timeout:
for locust_greenlet in self.locusts:
locust = locust_greenlet.args[0]
if locust._state == LOCUST_STATE_WAITING:
locust_greenlet.kill()
else:
locust._state = LOCUST_STATE_STOPPING
if not self.locusts.join(timeout=self.options.stop_timeout):
logger.info("Not all locusts finished their tasks & terminated in %s seconds. Killing them..." % self.options.stop_timeout)
self.locusts.kill(block=True)
self.state = STATE_STOPPED
events.locust_stop_hatching.fire()
Expand Down
93 changes: 93 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self):
self.master_bind_port = 5557
self.heartbeat_liveness = 3
self.heartbeat_interval = 0.01
self.stop_timeout = None

def reset_stats(self):
pass
Expand Down Expand Up @@ -422,3 +423,95 @@ def test_message_serialize(self):
self.assertEqual(msg.type, rebuilt.type)
self.assertEqual(msg.data, rebuilt.data)
self.assertEqual(msg.node_id, rebuilt.node_id)

class TestStopTimeout(unittest.TestCase):
def test_stop_timeout(self):
short_time = 0.05
class MyTaskSet(TaskSet):
@task
def my_task(self):
TestStopTimeout.state = "first"
gevent.sleep(short_time)
TestStopTimeout.state = "second" # should only run when run time + stop_timeout is > short_time
gevent.sleep(short_time)
TestStopTimeout.state = "third" # should only run when run time + stop_timeout is > short_time * 2

class MyTestLocust(Locust):
task_set = MyTaskSet

self.options = mocked_options()
runner = LocalLocustRunner([MyTestLocust], self.options)
runner.start_hatching(1, 1)
gevent.sleep(short_time / 2)
runner.quit()
self.assertEqual("first", TestStopTimeout.state)

self.options.stop_timeout = short_time / 2 # exit with timeout
runner = LocalLocustRunner([MyTestLocust], self.options)
runner.start_hatching(1, 1)
gevent.sleep(short_time)
runner.quit()
self.assertEqual("second", TestStopTimeout.state)

self.options.stop_timeout = short_time * 2 # allow task iteration to complete, with some margin
runner = LocalLocustRunner([MyTestLocust], self.options)
runner.start_hatching(1, 1)
gevent.sleep(short_time)
runner.quit()
self.assertEqual("third", TestStopTimeout.state)

def test_stop_timeout_exit_during_wait(self):
short_time = 0.05
class MyTaskSet(TaskSet):
@task
def my_task(self):
pass

class MyTestLocust(Locust):
task_set = MyTaskSet
min_wait = 1000
max_wait = 1000

self.options = mocked_options()
self.options.stop_timeout = short_time
runner = LocalLocustRunner([MyTestLocust], self.options)
runner.start_hatching(1, 1)
gevent.sleep(short_time) # sleep to make sure locust has had time to start waiting
timeout = gevent.Timeout(short_time)
timeout.start()
try:
runner.quit()
runner.greenlet.join()
except gevent.Timeout:
self.fail("Got Timeout exception. Waiting locusts should stop immediately, even when using stop_timeout.")
finally:
timeout.cancel()

def test_stop_timeout_with_interrupt(self):
short_time = 0.05
class MySubTaskSet(TaskSet):
@task
def a_task(self):
gevent.sleep(0)
self.interrupt(reschedule=True)

class MyTaskSet(TaskSet):
tasks = [MySubTaskSet]

class MyTestLocust(Locust):
task_set = MyTaskSet

self.options = mocked_options()
self.options.stop_timeout = short_time
runner = LocalLocustRunner([MyTestLocust], self.options)
runner.start_hatching(1, 1)
gevent.sleep(0)
timeout = gevent.Timeout(short_time)
timeout.start()
try:
runner.quit()
runner.greenlet.join()
except gevent.Timeout:
self.fail("Got Timeout exception. Interrupted locusts should check if they should exit immediately, even when using stop_timeout.")
finally:
timeout.cancel()