Skip to content

Commit

Permalink
Add a test for concurrent builds
Browse files Browse the repository at this point in the history
Fixes #548.
  • Loading branch information
Nikita Karetnikov committed Sep 26, 2023
1 parent 64552a2 commit 3fc0e14
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions conda-store-server/tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import datetime
import inspect
import json
import time

import celery
import pytest
import yaml
from conda_store_server import __version__, schema
Expand Down Expand Up @@ -424,6 +428,99 @@ def test_put_build_trigger_build_auth(
assert r.status == schema.APIStatus.OK


CREATE_BUILD_DELAY_SECS=30
CREATE_BUILD_NUM_BUILDS=4


# To test concurrent builds, we overwrite the 'build_conda_environment'
# function, which is used in 'create_build', such that it always takes N
# seconds, with the expectation that M concurrent calls will execute faster than
# M sequential calls. If concurrency works, the test should take roughly N
# seconds, plus some small overhead. If it doesn't work, it should take M * N
# seconds. Since no actual build is performed, we also need to overwrite other
# functions that are called from tasks started by the PUT request for
# 'create_build'.
#
# Note: this is a fixture to have control over fixture execution order. We need
# to run this before celery worker code starts executing.
@pytest.fixture
def mock_create_build(mocker):
def mock_build_conda_environment(*args, **kwargs):
# The print is here to make it easier to see that we're actually calling
# this mock function.
print(f"Running {inspect.currentframe().f_code.co_name}")
# Inserts delay
time.sleep(CREATE_BUILD_DELAY_SECS)

mocker.patch('conda_store_server.worker.tasks.build_conda_environment',
new=mock_build_conda_environment)
mocker.patch('conda_store_server.worker.tasks.build_conda_env_export',
new=lambda *args, **kwargs: None)
mocker.patch('conda_store_server.worker.tasks.build_conda_pack',
new=lambda *args, **kwargs: None)
mocker.patch('conda_store_server.worker.tasks.build_conda_docker',
new=lambda *args, **kwargs: None)


# The following 'parametrize' calls set parameters used by the 'celery_worker'
# fixture, so that it runs concurrently.
#
# Depending on whether code is run as an app or via pytest, celery defaults
# might be different. We need to make sure that parameters responsible for
# concurrent execution are set properly and that they match our app's defaults,
# so that we're testing the same configuration our app would run with.
#
# TODO: This doesn't read values from our app's config file. Instead, the test
# assumes that these are the same as the celery defaults.
#
# With pytest, the defaults in 'start_worker' (as of celery 5.3.1) are:
# concurrency=1 and pool='solo', which means non-concurrent execution.
#
# This shows defaults outside of pytest:
# >>> from celery import Celery
# >>> c = Celery()
# >>> print(c.conf['worker_concurrency'])
# None
# >>> print(c.conf['worker_pool'])
# prefork
#
# This shows all available pools:
# >>> from celery import concurrency
# >>> concurrency.get_available_pool_names()
# ('prefork', 'eventlet', 'gevent', 'solo', 'processes', 'threads', 'custom')
#
# https://stackoverflow.com/questions/66177414/run-celery-tasks-concurrently-using-pytest
# https://docs.pytest.org/en/7.1.x/how-to/fixtures.html#override-a-fixture-with-direct-test-parametrization
@pytest.mark.parametrize("celery_worker_parameters", [{"concurrency": CREATE_BUILD_NUM_BUILDS}])
@pytest.mark.parametrize("celery_worker_pool", ["prefork"])
def test_put_build_trigger_build_auth_concurrency(
testclient, seed_conda_store, authenticate, mock_create_build, celery_worker
):
start_time = datetime.datetime.utcnow()

init_build_id = 1
tasks = celery.result.ResultSet([])
for _ in range(CREATE_BUILD_NUM_BUILDS):
# Starts build
response = testclient.put(f"api/v1/build/{init_build_id}")
# Checks that response is OK
r = schema.APIPostSpecification.parse_obj(response.json())
assert r.status == schema.APIStatus.OK
# Gets task for this build
task = celery_worker.app.AsyncResult(f"build-{r.data.build_id}-environment")
tasks.add(task)
# Waits for tasks to finish
tasks.join()

# Checks test execution time
end_time = datetime.datetime.utcnow()
time_delta = end_time - start_time
# Makes sure our code is actually executing (at least N seconds)
assert time_delta >= datetime.timedelta(seconds=CREATE_BUILD_DELAY_SECS)
# Makes sure code runs concurrently (N seconds + some overhead)
assert time_delta < datetime.timedelta(seconds=CREATE_BUILD_DELAY_SECS * 2)


def test_create_namespace_noauth(testclient):
namespace = "pytest"

Expand Down

0 comments on commit 3fc0e14

Please sign in to comment.