Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uneven distribution of locust classes between running locusts #860

Closed
wants to merge 8 commits into from
66 changes: 66 additions & 0 deletions locust/locusts_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
import math

class _LocustInfo(object):
def __init__(self, locust, ratio):
self.locust = locust
self.ratio = ratio
self.count = 0

def lower_count(self, total_count):
return int(math.floor(self.ratio * total_count))

def upper_count(self, total_count):
return int(math.ceil(self.ratio * total_count))

def miscount(self, total_count):
return total_count * self.ratio - self.count

class LocustsCollection(object):
"""
LocustCollection maintain accurate distribution of locust classes among available locust executors according to the locust weight
Algorithm maintain next invariant:
1. Let p_i = weight_i / sum(weights)
2. Let count_i = p_i * locusts_count
3. Then, for every locust class there is at most two interger points near to count_i: floor(count_i) and ceil(count_i)
4. At every moment each locust class executed by floor(count_i) or ceil(count_i) locust executors
"""
def __init__(self, locust_classes):
total_weight = sum(locust.weight for locust in locust_classes)
self._locusts = [_LocustInfo(locust, locust.weight / float(total_weight))
for locust in locust_classes]
self.size = 0

@property
def classes(self):
return list(map(lambda l: l.locust, self._locusts))

def spawn_locusts(self, spawn_count):
spawned = []
new_size = self.size + spawn_count
for locust in self._locusts:
adjust_spawn_count = max(0, locust.lower_count(new_size) - locust.count)
spawned.extend(self._change_locust_count(locust, adjust_spawn_count))
return spawned + self._make_final_size_adjusment(new_size)

def kill_locusts(self, kill_count):
killed = []
new_size = self.size - kill_count
for locust in self._locusts:
adjust_kill_count = max(0, locust.count - locust.upper_count(new_size))
killed.extend(self._change_locust_count(locust, -adjust_kill_count))
return killed + self._make_final_size_adjusment(new_size)

def _change_locust_count(self, locust, count):
self.size += count
locust.count += count
return [locust.locust for _ in range(abs(count))]

def _make_final_size_adjusment(self, new_size):
adjusted = []
adjusted_size = abs(self.size - new_size)
add_locusts = self.size < new_size
sorted_locusts = sorted(self._locusts, key=lambda l: l.miscount(new_size), reverse=add_locusts)
for locust in list(sorted_locusts)[:adjusted_size]:
adjusted.extend(self._change_locust_count(locust, 1 if add_locusts else -1))
return adjusted
79 changes: 35 additions & 44 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import traceback
import warnings
from hashlib import md5
from collections import Counter
from time import time

import gevent
Expand All @@ -17,6 +18,7 @@
from . import events
from .rpc import Message, rpc
from .stats import global_stats
from .locusts_collection import LocustsCollection

logger = logging.getLogger(__name__)

Expand All @@ -30,7 +32,7 @@
class LocustRunner(object):
def __init__(self, locust_classes, options):
self.options = options
self.locust_classes = locust_classes
self.locusts_collection = LocustsCollection(self.filter_true_locust_classes(locust_classes))
self.hatch_rate = options.hatch_rate
self.num_clients = options.num_clients
self.host = options.host
Expand Down Expand Up @@ -61,65 +63,54 @@ def errors(self):
def user_count(self):
return len(self.locusts)

def weight_locusts(self, amount, stop_timeout = None):
"""
Distributes the amount of locusts for each WebLocust-class according to it's weight
returns a list "bucket" with the weighted locusts
"""
bucket = []
weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))
for locust in self.locust_classes:
def filter_true_locust_classes(self, locust_classes):
true_locust_classes = []
for locust in locust_classes:
if not locust.task_set:
warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)
continue
warnings.warn("Notice: Found Locust class (%s.%s) got no task_set. Skipping..." % (locust.__module__, locust.__name__))
else:
true_locust_classes.append(locust)
return true_locust_classes

def update_locusts_settings(self, locusts, stop_timeout=None):
for locust in self.locusts:
sivukhin marked this conversation as resolved.
Show resolved Hide resolved
if self.host is not None:
locust.host = self.host
if stop_timeout is not None:
locust.stop_timeout = stop_timeout

# create locusts depending on weight
percent = locust.weight / float(weight_sum)
num_locusts = int(round(amount * percent))
bucket.extend([locust for x in xrange(0, num_locusts)])
return bucket
return locusts

def spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):
if spawn_count is None:
spawn_count = self.num_clients

bucket = self.weight_locusts(spawn_count, stop_timeout)
spawn_count = len(bucket)
bucket = self.locusts_collection.spawn_locusts(spawn_count)
bucket = self.update_locusts_settings(bucket, stop_timeout)
if self.state == STATE_INIT or self.state == STATE_STOPPED:
self.state = STATE_HATCHING
self.num_clients = spawn_count
else:
self.num_clients += spawn_count

logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))
occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])

def hatch():
sleep_time = 1.0 / self.hatch_rate
while True:
if not bucket:
logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))
events.hatch_complete.fire(user_count=self.num_clients)
return

locust = bucket.pop(random.randint(0, len(bucket)-1))
occurence_count[locust.__name__] += 1
def start_locust(_):
try:
locust().run(runner=self)
except GreenletExit:
pass
new_locust = self.locusts.spawn(start_locust, locust)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
gevent.sleep(sleep_time)

hatch()

sleep_time = 1.0 / self.hatch_rate
random.shuffle(bucket)
for locust in bucket:
def start_locust(_):
try:
locust().run(runner=self)
except GreenletExit:
pass
new_locust = self.locusts.spawn(start_locust, locust)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
gevent.sleep(sleep_time)

hatched_locusts = Counter(map(lambda l: l.__name__, bucket))
logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in hatched_locusts.items()]))
events.hatch_complete.fire(user_count=self.num_clients)

if wait:
self.locusts.join()
logger.info("All locusts dead\n")
Expand All @@ -128,8 +119,8 @@ def kill_locusts(self, kill_count):
"""
Kill a kill_count of weighted locusts from the Group() object in self.locusts
"""
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
bucket = self.locusts_collection.kill_locusts(kill_count)
bucket = self.update_locusts_settings(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
Expand Down
87 changes: 87 additions & 0 deletions locust/test/test_locusts_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import unittest

from locust.locusts_collection import LocustsCollection
from collections import Counter

class MockLocust(object):
def __init__(self, weight):
self.weight = weight
def __str__(self):
return 'locust(w={})'.format(self.weight)
def __repr__(self):
return 'MockLocust({})'.format(self.weight)
class TestLocustsCollection(unittest.TestCase):
def assertEqualUnorderedCollections(self, a, b):
self.assertEqual(Counter(a), Counter(b))

def test_spawn_locusts_1_class(self):
locust = MockLocust(1)
collection = LocustsCollection([locust])
spawned = collection.spawn_locusts(4)
self.assertEqualUnorderedCollections(spawned, [locust, locust, locust, locust])

def test_spawn_locusts_3_classes(self):
l1, l2, l3 = MockLocust(1), MockLocust(1), MockLocust(1)
collection = LocustsCollection([l1, l2, l3])
s1 = collection.spawn_locusts(1)
s2 = collection.spawn_locusts(1)
s3 = collection.spawn_locusts(1)
self.assertEqualUnorderedCollections(s1 + s2 + s3, [l1, l2, l3])

def test_kill_locusts_1_class(self):
locust = MockLocust(1)
collection = LocustsCollection([locust])
spawned = collection.spawn_locusts(4)
killed1 = collection.kill_locusts(3)
killed2 = collection.kill_locusts(1)
self.assertEqualUnorderedCollections(killed1, [locust, locust, locust])
self.assertEqualUnorderedCollections(killed2, [locust])

def test_kill_locusts_3_classes(self):
l1, l2, l3 = MockLocust(1), MockLocust(1), MockLocust(1)
collection = LocustsCollection([l1, l2, l3])
spawned = collection.spawn_locusts(3)
k1 = collection.kill_locusts(1)
k2 = collection.kill_locusts(1)
k3 = collection.kill_locusts(1)
self.assertEqualUnorderedCollections(k1 + k2 + k3, [l1, l2, l3])

def test_spawn_complex_weight_distribution(self):
l1, l3, l5 = MockLocust(1), MockLocust(3), MockLocust(5)
collection = LocustsCollection([l1, l3, l5])
s1 = collection.spawn_locusts(1)
s2 = collection.spawn_locusts(3)
s3 = collection.spawn_locusts(5)
self.assertEqualUnorderedCollections(s1, [l5])
self.assertEqualUnorderedCollections(s2, [l1, l3, l5])
self.assertEqualUnorderedCollections(s3, [l3, l3, l5, l5, l5])

def test_kill_complex_weight_distribution(self):
l1, l3, l5 = MockLocust(1), MockLocust(3), MockLocust(5)
collection = LocustsCollection([l1, l3, l5])
spawned = collection.spawn_locusts(9)
k1 = collection.kill_locusts(1)
k2 = collection.kill_locusts(3)
k3 = collection.kill_locusts(5)
self.assertEqualUnorderedCollections(k1, [l5])
self.assertEqualUnorderedCollections(k2, [l1, l3, l5])
self.assertEqualUnorderedCollections(k3, [l3, l3, l5, l5, l5])

def test_spawn_and_kill(self):
l1, l3, l5 = MockLocust(1), MockLocust(3), MockLocust(5)
collection = LocustsCollection([l1, l3, l5])
s1 = collection.spawn_locusts(4)
k1 = collection.kill_locusts(1)
k2 = collection.kill_locusts(2)
s2 = collection.spawn_locusts(3)
self.assertEqualUnorderedCollections(s1, [l1, l3, l5, l5])
self.assertEqualUnorderedCollections(k1, [l1])
self.assertEqualUnorderedCollections(k2, [l3, l5])
self.assertEqualUnorderedCollections(s2, [l1, l3, l5])

def test_spawn_many_locusts(self):
l1, l3, l5 = MockLocust(2), MockLocust(3), MockLocust(5)
collection = LocustsCollection([l1, l3, l5])
spawned = collection.spawn_locusts(100)
expected = [l1 for _ in range(20)] + [l3 for _ in range(30)] + [l5 for _ in range(50)]
self.assertEqualUnorderedCollections(spawned, expected)
Loading