Skip to content

Commit

Permalink
Add --processes parameter: Number of times to fork the locust process…
Browse files Browse the repository at this point in the history
…, to enable using multiple CPU cores. Use -1 to launch one process per CPU core in your system. Combine with --worker flag or let it automatically set --worker and --master flags for an all-in-one-solution.
  • Loading branch information
cyberw committed Nov 18, 2023
1 parent f4288d3 commit 5d631f3
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 2 deletions.
6 changes: 6 additions & 0 deletions locust/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ def setup_parser_arguments(parser):
help="Set locust to run in distributed mode with this process as worker",
env_var="LOCUST_MODE_WORKER",
)
worker_group.add_argument(
"--processes",
type=int,
help="Number of times to fork the locust process, to enable using multiple CPU cores. Use -1 to launch one process per CPU core in your system. Combine with --worker flag or let it automatically set --worker and --master flags for an all-in-one-solution. Not available on Windows. Experimental.",
env_var="LOCUST_PROCESSES",
)
worker_group.add_argument(
"--slave",
action="store_true",
Expand Down
75 changes: 74 additions & 1 deletion locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .input_events import input_listener
from .html import get_html_report
from .util.load_locustfile import load_locustfile
import traceback

try:
# import locust_plugins if it is installed, to allow it to register custom arguments etc
Expand All @@ -37,6 +38,7 @@

version = locust.__version__

first_call = True

# Options to ignore when using a custom shape class without `use_common_options=True`
# See: https://docs.locust.io/en/stable/custom-load-shape.html#use-common-options
Expand Down Expand Up @@ -162,6 +164,78 @@ def is_valid_percentile(parameter):
sys.stderr.write("Invalid --loglevel. Valid values are: DEBUG/INFO/WARNING/ERROR/CRITICAL\n")
sys.exit(1)

children = []

if options.processes:
if os.name == "nt":
raise Exception("--processes is not supported in Windows (except in WSL)")
if options.processes == -1:
options.processes = os.cpu_count()
assert options.processes, "couldnt detect number of cpus!?"
elif options.processes < -1:
raise Exception(f"invalid processes count {options.processes}")
for _ in range(options.processes):
child_pid = gevent.fork()
if child_pid:
children.append(child_pid)
logging.debug(f"Started child worker with pid #{child_pid}")
else:
# child is always a worker, even when it wasnt set on command line
options.worker = True
# remove options that dont make sense on worker
options.run_time = None
options.autostart = None
break
else:
# we're in the parent process
def sigint_handler(_signal, _frame):
# ignore the first sigint in parent, and wait for the children to handle sigint
global first_call
if first_call:
first_call = False
else:
# if parent gets repeated sigint, we kill the children hard
for child_pid in children:
try:
logging.debug(f"Sending SIGKILL to child with pid {child_pid}")
os.kill(child_pid, signal.SIGKILL)
except ProcessLookupError:
pass # process already dead
except:
logging.error(traceback.format_exc())
sys.exit(1)

if options.worker:
signal.signal(signal.SIGINT, sigint_handler)
exit_code = 0
# nothing more to do, just wait for the children to exit
for child_pid in children:
_, child_status = os.waitpid(child_pid, 0)
child_exit_code = os.waitstatus_to_exitcode(child_status)
exit_code = max(exit_code, child_exit_code)
sys.exit(exit_code)
else:
options.master = True
options.expect_workers = options.processes

def kill_workers(children):
exit_code = 0
logging.debug("Sending SIGINT to children")
for child_pid in children:
try:
os.kill(child_pid, signal.SIGINT)
except ProcessLookupError:
pass # never mind, process was already dead
logging.debug("waiting for children to terminate")
for child_pid in children:
_, child_status = os.waitpid(child_pid, 0)
child_exit_code = os.waitstatus_to_exitcode(child_status)
exit_code = max(exit_code, child_exit_code)
if exit_code > 1:
logging.error(f"bad response code from worker children: {exit_code}")

atexit.register(kill_workers, children)

logger = logging.getLogger(__name__)
greenlet_exception_handler = greenlet_exception_logger(logger)

Expand Down Expand Up @@ -486,7 +560,6 @@ def shutdown():
print_stats(runner.stats, current=False)
print_percentile_stats(runner.stats)
print_error_report(runner.stats)

environment.events.quit.fire(exit_code=code)
sys.exit(code)

Expand Down
6 changes: 5 additions & 1 deletion locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,11 @@ def _send_stats(self) -> None:
def connect_to_master(self):
self.retry += 1
self.client.send(Message("client_ready", __version__, self.client_id))
success = self.connection_event.wait(timeout=CONNECT_TIMEOUT)
try:
success = self.connection_event.wait(timeout=CONNECT_TIMEOUT)
except KeyboardInterrupt:
# dont complain about getting CTRL-C
sys.exit(1)
if not success:
if self.retry < 3:
logger.debug(
Expand Down
101 changes: 101 additions & 0 deletions locust/test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1727,3 +1727,104 @@ def my_task(self):
for i in range(2):
if found[i] != i:
raise Exception(f"expected index {i} but got", found[i])

def test_processes(self):
with mock_locustfile() as mocked:
command = f"locust -f {mocked.file_path} --processes 4 --headless --run-time 1 --exit-code-on-error 0"
proc = subprocess.Popen(
command,
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)
try:
_, stderr = proc.communicate(timeout=9)
except:
proc.kill()
assert False, f"locust process never finished: {command}"
self.assertNotIn("Traceback", stderr)
self.assertIn("(index 3) reported as ready", stderr)
self.assertIn("Shutting down (exit code 0)", stderr)

def test_processes_autodetect(self):
with mock_locustfile() as mocked:
command = f"locust -f {mocked.file_path} --processes -1 --headless --run-time 1 --exit-code-on-error 0"
proc = subprocess.Popen(
command,
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)
try:
_, stderr = proc.communicate(timeout=9)
except:
proc.kill()
assert False, f"locust process never finished: {command}"
self.assertNotIn("Traceback", stderr)
self.assertIn("(index 0) reported as ready", stderr)
self.assertIn("Shutting down (exit code 0)", stderr)

def test_processes_separate_worker(self):
with mock_locustfile() as mocked:
master_proc = subprocess.Popen(
f"locust -f {mocked.file_path} --master --headless --run-time 1 --exit-code-on-error 0 --expect-workers-max-wait 2",
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)

worker_parent_proc = subprocess.Popen(
f"locust -f {mocked.file_path} --processes 4 --worker",
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)

try:
_, worker_stderr = worker_parent_proc.communicate(timeout=9)
except:
master_proc.kill()
worker_parent_proc.kill()
_, worker_stderr = worker_parent_proc.communicate()
assert False, f"worker never finished: {worker_stderr}"

try:
_, master_stderr = master_proc.communicate(timeout=9)
except:
master_proc.kill()
worker_parent_proc.kill()
_, master_stderr = master_proc.communicate()
assert False, f"master never finished: {master_stderr}"

_, worker_stderr = worker_parent_proc.communicate()
_, master_stderr = master_proc.communicate()
self.assertNotIn("Traceback", worker_stderr)
self.assertNotIn("Traceback", master_stderr)
self.assertNotIn("Gave up waiting for workers to connect", master_stderr)
self.assertIn("(index 3) reported as ready", master_stderr)
self.assertIn("Shutting down (exit code 0)", master_stderr)

def test_processes_ctrl_c(self):
with mock_locustfile() as mocked:
proc = subprocess.Popen(
f"locust -f {mocked.file_path} --processes 4 --headless",
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)
gevent.sleep(2)
proc.send_signal(signal.SIGINT)
try:
_, stderr = proc.communicate(timeout=3)
except:
proc.kill()
_, stderr = proc.communicate()
assert False, f"locust process never finished: {stderr}"
self.assertNotIn("Traceback", stderr)
self.assertIn("The last worker quit, stopping test", stderr)
self.assertIn("Shutting down (exit code 0)", stderr)

0 comments on commit 5d631f3

Please sign in to comment.