From 3fc0e1425fae17f1bae8a1f35421e9c77f553059 Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Wed, 30 Aug 2023 10:34:08 +0000 Subject: [PATCH] Add a test for concurrent builds Fixes #548. --- conda-store-server/tests/test_server.py | 97 +++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/conda-store-server/tests/test_server.py b/conda-store-server/tests/test_server.py index 7040e338a..558849218 100644 --- a/conda-store-server/tests/test_server.py +++ b/conda-store-server/tests/test_server.py @@ -1,5 +1,9 @@ +import datetime +import inspect import json +import time +import celery import pytest import yaml from conda_store_server import __version__, schema @@ -424,6 +428,99 @@ def test_put_build_trigger_build_auth( assert r.status == schema.APIStatus.OK +CREATE_BUILD_DELAY_SECS=30 +CREATE_BUILD_NUM_BUILDS=4 + + +# To test concurrent builds, we overwrite the 'build_conda_environment' +# function, which is used in 'create_build', such that it always takes N +# seconds, with the expectation that M concurrent calls will execute faster than +# M sequential calls. If concurrency works, the test should take roughly N +# seconds, plus some small overhead. If it doesn't work, it should take M * N +# seconds. Since no actual build is performed, we also need to overwrite other +# functions that are called from tasks started by the PUT request for +# 'create_build'. +# +# Note: this is a fixture to have control over fixture execution order. We need +# to run this before celery worker code starts executing. +@pytest.fixture +def mock_create_build(mocker): + def mock_build_conda_environment(*args, **kwargs): + # The print is here to make it easier to see that we're actually calling + # this mock function. + print(f"Running {inspect.currentframe().f_code.co_name}") + # Inserts delay + time.sleep(CREATE_BUILD_DELAY_SECS) + + mocker.patch('conda_store_server.worker.tasks.build_conda_environment', + new=mock_build_conda_environment) + mocker.patch('conda_store_server.worker.tasks.build_conda_env_export', + new=lambda *args, **kwargs: None) + mocker.patch('conda_store_server.worker.tasks.build_conda_pack', + new=lambda *args, **kwargs: None) + mocker.patch('conda_store_server.worker.tasks.build_conda_docker', + new=lambda *args, **kwargs: None) + + +# The following 'parametrize' calls set parameters used by the 'celery_worker' +# fixture, so that it runs concurrently. +# +# Depending on whether code is run as an app or via pytest, celery defaults +# might be different. We need to make sure that parameters responsible for +# concurrent execution are set properly and that they match our app's defaults, +# so that we're testing the same configuration our app would run with. +# +# TODO: This doesn't read values from our app's config file. Instead, the test +# assumes that these are the same as the celery defaults. +# +# With pytest, the defaults in 'start_worker' (as of celery 5.3.1) are: +# concurrency=1 and pool='solo', which means non-concurrent execution. +# +# This shows defaults outside of pytest: +# >>> from celery import Celery +# >>> c = Celery() +# >>> print(c.conf['worker_concurrency']) +# None +# >>> print(c.conf['worker_pool']) +# prefork +# +# This shows all available pools: +# >>> from celery import concurrency +# >>> concurrency.get_available_pool_names() +# ('prefork', 'eventlet', 'gevent', 'solo', 'processes', 'threads', 'custom') +# +# https://stackoverflow.com/questions/66177414/run-celery-tasks-concurrently-using-pytest +# https://docs.pytest.org/en/7.1.x/how-to/fixtures.html#override-a-fixture-with-direct-test-parametrization +@pytest.mark.parametrize("celery_worker_parameters", [{"concurrency": CREATE_BUILD_NUM_BUILDS}]) +@pytest.mark.parametrize("celery_worker_pool", ["prefork"]) +def test_put_build_trigger_build_auth_concurrency( + testclient, seed_conda_store, authenticate, mock_create_build, celery_worker +): + start_time = datetime.datetime.utcnow() + + init_build_id = 1 + tasks = celery.result.ResultSet([]) + for _ in range(CREATE_BUILD_NUM_BUILDS): + # Starts build + response = testclient.put(f"api/v1/build/{init_build_id}") + # Checks that response is OK + r = schema.APIPostSpecification.parse_obj(response.json()) + assert r.status == schema.APIStatus.OK + # Gets task for this build + task = celery_worker.app.AsyncResult(f"build-{r.data.build_id}-environment") + tasks.add(task) + # Waits for tasks to finish + tasks.join() + + # Checks test execution time + end_time = datetime.datetime.utcnow() + time_delta = end_time - start_time + # Makes sure our code is actually executing (at least N seconds) + assert time_delta >= datetime.timedelta(seconds=CREATE_BUILD_DELAY_SECS) + # Makes sure code runs concurrently (N seconds + some overhead) + assert time_delta < datetime.timedelta(seconds=CREATE_BUILD_DELAY_SECS * 2) + + def test_create_namespace_noauth(testclient): namespace = "pytest"