Skip to content

Commit

Permalink
Add test_start and test_stop events.
Browse files Browse the repository at this point in the history
Remove master_start_hatching, master_stop_hatching, locust_start_hatching and locust_stop_hatching because their purpose is very unclear. The documentation does not correspond to how they work, and there are no tests for them. The test_start and test_stop events should probably work as replacement in many cases, and I'm planning to add a user_count_change event that is fired on all worker nodes when users are hatched/killed.
  • Loading branch information
heyman committed Apr 4, 2020
1 parent 7d5f6fa commit 220f9ce
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 64 deletions.
34 changes: 13 additions & 21 deletions locust/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,7 @@ class Events:
*quitting* is fired when the locust process is exiting
"""

master_start_hatching = EventHook
"""
*master_start_hatching* is fired when we initiate the hatching process on the master.
This event is especially useful to detect when the 'start' button is clicked on the web ui.
"""

master_stop_hatching = EventHook
"""
*master_stop_hatching* is fired when terminate the hatching process on the master.
This event is especially useful to detect when the 'stop' button is clicked on the web ui.
"""

locust_start_hatching = EventHook
"""
*locust_start_hatching* is fired when we initiate the hatching process on any locust worker.
"""

locust_stop_hatching = EventHook
"""
*locust_stop_hatching* is fired when terminate the hatching process on any locust worker.
"""

init = EventHook
Expand All @@ -155,6 +134,19 @@ class Events:
* *parser*: ArgumentParser instance
"""

test_start = EventHook
"""
*test_start* is fired when a new load test is started. It's not fired again if the number of
Locust users change during a test. When running locust distributed the event is only fired
on the master node and not on each worker node.
"""

test_stop = EventHook
"""
*test_stop* is fired when a load test is stopped. When running locust distributed the event
is only fired on the master node and not on each worker node.
"""

def __init__(self):
for name, value in vars(type(self)).items():
if value == EventHook:
Expand Down
19 changes: 15 additions & 4 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ def start(self, locust_count, hatch_rate, wait=False):
self.exceptions = {}
self.cpu_warning_emitted = False
self.worker_cpu_warning_emitted = False
self.environment.events.locust_start_hatching.fire()

# Dynamically changing the locust count
if self.state != STATE_INIT and self.state != STATE_STOPPED:
Expand Down Expand Up @@ -248,7 +247,6 @@ def stop(self):
self.kill_locust_instances([g.args[0] for g in self.locusts])
self.state = STATE_STOPPED
self.cpu_log_warning()
self.environment.events.locust_stop_hatching.fire()

def quit(self):
self.stop()
Expand Down Expand Up @@ -278,10 +276,19 @@ def on_locust_error(locust_instance, exception, tb):
def start(self, locust_count, hatch_rate, wait=False):
if hatch_rate > 100:
logger.warning("Your selected hatch rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?")

if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
# if we're not already running we'll fire the test_start event
self.environment.events.test_start.fire(environment=self.environment)

if self.hatching_greenlet:
# kill existing hatching_greenlet before we start a new one
self.hatching_greenlet.kill(block=True)
self.hatching_greenlet = self.greenlet.spawn(lambda: super(LocalLocustRunner, self).start(locust_count, hatch_rate, wait=wait))

def stop(self):
super().stop()
self.environment.events.test_stop.fire(environment=self.environment)


class DistributedLocustRunner(LocustRunner):
Expand Down Expand Up @@ -375,7 +382,7 @@ def start(self, locust_count, hatch_rate):
if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
self.stats.clear_all()
self.exceptions = {}
self.environment.events.master_start_hatching.fire()
self.environment.events.test_start.fire(environment=self.environment)

for client in (self.clients.ready + self.clients.running + self.clients.hatching):
data = {
Expand All @@ -397,9 +404,13 @@ def stop(self):
self.state = STATE_STOPPING
for client in self.clients.all:
self.server.send_to_client(Message("stop", None, client.id))
self.environment.events.master_stop_hatching.fire()
self.environment.events.test_stop.fire(environment=self.environment)

def quit(self):
if self.state in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:
# fire test_stop event if state isn't already stopped
self.environment.events.test_stop.fire(environment=self.environment)

for client in self.clients.all:
self.server.send_to_client(Message("quit", None, client.id))
gevent.sleep(0.5) # wait for final stats report from all workers
Expand Down
110 changes: 71 additions & 39 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,63 +158,46 @@ def trigger(self):
self.assertTrue(g2.dead)
self.assertTrue(triggered[0])

def test_setup_method_exception(self):
def test_start_event(self):
class User(Locust):
setup_run_count = 0
task_run_count = 0
locust_error_count = 0
wait_time = constant(1)
def setup(self):
User.setup_run_count += 1
raise Exception("some exception")
task_run_count = 0
@task
class task_set(TaskSet):
@task
def my_task(self):
User.task_run_count += 1
def my_task(self):
User.task_run_count += 1

environment = Environment(options=mocked_options())
test_start_run = [0]

def on_locust_error(*args, **kwargs):
User.locust_error_count += 1
environment.events.locust_error.add_listener(on_locust_error)
environment = Environment(options=mocked_options())
def on_test_start(*args, **kwargs):
test_start_run[0] += 1
environment.events.test_start.add_listener(on_test_start)

runner = LocalLocustRunner(environment, locust_classes=[User])
runner.start(locust_count=3, hatch_rate=3, wait=False)
runner.hatching_greenlet.get(timeout=3)

self.assertEqual(1, User.setup_run_count)
self.assertEqual(1, User.locust_error_count)
self.assertEqual(1, test_start_run[0])
self.assertEqual(3, User.task_run_count)

def test_taskset_setup_method_exception(self):
def test_stop_event(self):
class User(Locust):
setup_run_count = 0
task_run_count = 0
locust_error_count = 0
wait_time = constant(1)
@task
class task_set(TaskSet):
def setup(self):
User.setup_run_count += 1
raise Exception("some exception")
@task
def my_task(self):
User.task_run_count += 1

def my_task(self):
pass

test_stop_run = [0]
environment = Environment(options=mocked_options())

def on_locust_error(*args, **kwargs):
User.locust_error_count += 1
environment.events.locust_error.add_listener(on_locust_error)

def on_test_stop(*args, **kwargs):
test_stop_run[0] += 1
environment.events.test_stop.add_listener(on_test_stop)

runner = LocalLocustRunner(environment, locust_classes=[User])
runner.start(locust_count=3, hatch_rate=3, wait=False)
runner.hatching_greenlet.get(timeout=3)

self.assertEqual(1, User.setup_run_count)
self.assertEqual(1, User.locust_error_count)
self.assertEqual(3, User.task_run_count)
self.assertEqual(0, test_stop_run[0])
runner.stop()
self.assertEqual(1, test_stop_run[0])

def test_change_user_count_during_hatching(self):
class User(Locust):
Expand Down Expand Up @@ -505,6 +488,49 @@ def test_sends_hatch_data_to_ready_running_hatching_workers(self):
master.start(locust_count=5,hatch_rate=5)

self.assertEqual(3, len(server.outbox))

def test_start_event(self):
"""
Tests that test_start event is fired
"""
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()

run_count = [0]
@self.environment.events.test_start.add_listener
def on_test_start(*a, **kw):
run_count[0] += 1

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
self.assertEqual(1, run_count[0])

# change number of users and check that test_start isn't fired again
master.start(7, 7)
self.assertEqual(1, run_count[0])

def test_stop_event(self):
"""
Tests that test_stop event is fired
"""
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()

run_count = [0]
@self.environment.events.test_stop.add_listener
def on_test_stop(*a, **kw):
run_count[0] += 1

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
master.stop()
self.assertEqual(1, run_count[0])

def test_spawn_zero_locusts(self):
class MyTaskSet(TaskSet):
Expand Down Expand Up @@ -685,6 +711,10 @@ def the_task(self):

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
environment = Environment(options=mocked_options())
test_start_run = [False]
@environment.events.test_start.add_listener
def on_test_start(**kw):
test_start_run[0] = True
worker = self.get_runner(environment=environment, locust_classes=[MyTestLocust])
self.assertEqual(1, len(client.outbox))
self.assertEqual("client_ready", client.outbox[0].type)
Expand All @@ -707,6 +737,8 @@ def the_task(self):
worker.locusts.join()
# check that locust user got to finish
self.assertEqual(2, MyTestLocust._test_state)
# make sure the test_start was never fired on the worker
self.assertFalse(test_start_run[0])

def test_worker_without_stop_timeout(self):
class MyTestLocust(Locust):
Expand Down

0 comments on commit 220f9ce

Please sign in to comment.