From f04760ef05e2f5352d5795bad65a783fe124cdc3 Mon Sep 17 00:00:00 2001 From: hugo Date: Thu, 1 Aug 2024 16:26:07 +0200 Subject: [PATCH] Move agent code loader setup to InProcessExecutorManager (Issue #7589, PR #7867) # Description closes https://github.com/inmanta/inmanta-core/issues/7589 # Self Check: Strike through any lines that are not applicable (`~~line~~`) then check the box - [x] Attached issue to pull request - [x] Changelog entry - [x] Type annotations are present - [x] Code is clear and sufficiently documented - [x] No (preventable) type errors (check using make mypy or make mypy-diff) - [ ] Sufficient test cases (reproduces the bug/tests the requested feature) ~~- [ ] Correct, in line with design~~ ~~- [ ] End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )~~ ~~- [ ] If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see [test-fixes](https://internal.inmanta.com/development/core/tasks/build-master.html#test-fixes) for more info)~~ --- ...ader-setup-to-InProcessExecutorManager.yml | 4 + src/inmanta/agent/agent.py | 112 +++--------------- src/inmanta/agent/config.py | 2 +- src/inmanta/agent/in_process_executor.py | 88 +++++++++++++- tests/agent_server/test_agent_code_loading.py | 49 ++++++-- 5 files changed, 144 insertions(+), 111 deletions(-) create mode 100644 changelogs/unreleased/7589-move-agent-code-loader-setup-to-InProcessExecutorManager.yml diff --git a/changelogs/unreleased/7589-move-agent-code-loader-setup-to-InProcessExecutorManager.yml b/changelogs/unreleased/7589-move-agent-code-loader-setup-to-InProcessExecutorManager.yml new file mode 100644 index 0000000000..90c863af01 --- /dev/null +++ b/changelogs/unreleased/7589-move-agent-code-loader-setup-to-InProcessExecutorManager.yml @@ -0,0 +1,4 @@ +description: Move agent code loader setup to InProcessExecutorManager +change-type: patch +issue-nr: 7589 +destination-branches: [master] diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 2e4a3a4800..8a0efb6dc4 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -26,17 +26,13 @@ import random import time import uuid -from asyncio import Lock -from collections import defaultdict from collections.abc import Callable, Coroutine, Iterable, Mapping, Sequence from concurrent.futures.thread import ThreadPoolExecutor from logging import Logger from typing import Any, Collection, Dict, Optional, Union, cast -import pkg_resources - import inmanta.agent.executor -from inmanta import config, const, data, env, protocol +from inmanta import config, const, data, protocol from inmanta.agent import config as cfg from inmanta.agent import executor, forking_executor, in_process_executor from inmanta.agent.executor import ResourceDetails, ResourceInstallSpec @@ -49,20 +45,11 @@ ResourceType, ResourceVersionIdStr, ) -from inmanta.loader import CodeLoader, ModuleSource +from inmanta.loader import ModuleSource from inmanta.protocol import SessionEndpoint, SyncClient, methods, methods_v2 from inmanta.resources import Id from inmanta.types import Apireturn, JsonType -from inmanta.util import ( - CronSchedule, - IntervalSchedule, - NamedLock, - ScheduledTask, - TaskMethod, - TaskSchedule, - add_future, - join_threadpools, -) +from inmanta.util import CronSchedule, IntervalSchedule, ScheduledTask, TaskMethod, TaskSchedule, add_future, join_threadpools LOGGER = logging.getLogger(__name__) @@ -972,27 +959,18 @@ def __init__( self._instances: dict[str, AgentInstance] = {} self._instances_lock = asyncio.Lock() - self._loader: Optional[CodeLoader] = None - self._env: Optional[env.VirtualEnv] = None - if code_loader: - # all of this should go into the executor manager https://github.com/inmanta/inmanta-core/issues/7589 - self._env = env.VirtualEnv(self._storage["env"]) - self._env.use_virtual_env() - self._loader = CodeLoader(self._storage["code"], clean=True) - # Lock to ensure only one actual install runs at a time - self._loader_lock = Lock() - # Keep track for each resource type of the last loaded version - self._last_loaded_version: dict[str, executor.ExecutorBlueprint | None] = defaultdict(lambda: None) - # Cache to prevent re-fetching the same resource-version - self._previously_loaded: dict[tuple[str, int], ResourceInstallSpec] = {} - # Per-resource lock to serialize all actions per resource - self._resource_loader_lock = NamedLock() + # Cache to prevent re-fetching the same resource-version + self._code_cache: dict[tuple[str, int], ResourceInstallSpec] = {} self.agent_map: Optional[dict[str, str]] = agent_map remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExecutorMode.forking can_have_remote_executor = code_loader + # Mechanism to speed up tests using the old (<= iso7) agent mechanism + # by avoiding spawning a virtual environment. + self._code_loader = code_loader + self.executor_manager: executor.ExecutorManager[executor.Executor] if remote_executor and can_have_remote_executor: LOGGER.info("Selected forking agent executor mode") @@ -1016,6 +994,9 @@ def __init__( asyncio.get_event_loop(), LOGGER, self, + self._storage["code"], + self._storage["env"], + code_loader, ) async def _init_agent_map(self) -> None: @@ -1225,7 +1206,7 @@ async def get_code( - collection of ResourceInstallSpec for resource_types with valid handler code and pip config - set of invalid resource_types (no handler code and/or invalid pip config) """ - if self._loader is None: + if not self._code_loader: return [], set() # store it outside the loop, but only load when required @@ -1234,7 +1215,7 @@ async def get_code( resource_install_specs: list[ResourceInstallSpec] = [] invalid_resource_types: executor.FailedResourcesSet = set() for resource_type in set(resource_types): - cached_spec: Optional[ResourceInstallSpec] = self._previously_loaded.get((resource_type, version)) + cached_spec: Optional[ResourceInstallSpec] = self._code_cache.get((resource_type, version)) if cached_spec: LOGGER.debug( "Cache hit, using existing ResourceInstallSpec for resource_type=%s version=%d", resource_type, version @@ -1270,10 +1251,10 @@ async def get_code( resource_type, version, executor.ExecutorBlueprint(pip_config, list(requirements), sources) ) resource_install_specs.append(resource_install_spec) - # Update the ``_previously_loaded`` cache to indicate that the given resource type's ResourceInstallSpec + # Update the ``_code_cache`` cache to indicate that the given resource type's ResourceInstallSpec # was constructed successfully at the specified version. # TODO: this cache is a slight memory leak - self._previously_loaded[(resource_type, version)] = resource_install_spec + self._code_cache[(resource_type, version)] = resource_install_spec else: LOGGER.error( "Failed to get source code for %s version=%d\n%s", @@ -1285,67 +1266,6 @@ async def get_code( return resource_install_specs, invalid_resource_types - async def ensure_code(self, code: Collection[ResourceInstallSpec]) -> executor.FailedResourcesSet: - """Ensure that the code for the given environment and version is loaded""" - - failed_to_load: executor.FailedResourcesSet = set() - - if self._loader is None: - return failed_to_load - - for resource_install_spec in code: - # only one logical thread can load a particular resource type at any time - async with self._resource_loader_lock.get(resource_install_spec.resource_type): - # stop if the last successful load was this one - # The combination of the lock and this check causes the reloads to naturally 'batch up' - if self._last_loaded_version[resource_install_spec.resource_type] == resource_install_spec.blueprint: - LOGGER.debug( - "Handler code already installed for %s version=%d", - resource_install_spec.resource_type, - resource_install_spec.model_version, - ) - continue - - try: - # Install required python packages and the list of ``ModuleSource`` with the provided pip config - LOGGER.debug( - "Installing handler %s version=%d", - resource_install_spec.resource_type, - resource_install_spec.model_version, - ) - await self._install(resource_install_spec.blueprint) - LOGGER.debug( - "Installed handler %s version=%d", - resource_install_spec.resource_type, - resource_install_spec.model_version, - ) - - self._last_loaded_version[resource_install_spec.resource_type] = resource_install_spec.blueprint - except Exception: - LOGGER.exception( - "Failed to install handler %s version=%d", - resource_install_spec.resource_type, - resource_install_spec.model_version, - ) - failed_to_load.add(resource_install_spec.resource_type) - self._last_loaded_version[resource_install_spec.resource_type] = None - - return failed_to_load - - async def _install(self, blueprint: executor.ExecutorBlueprint) -> None: - if self._env is None or self._loader is None: - raise Exception("Unable to load code when agent is started with code loading disabled.") - - async with self._loader_lock: - loop = asyncio.get_running_loop() - await loop.run_in_executor( - self.thread_pool, - self._env.install_for_config, - list(pkg_resources.parse_requirements(blueprint.requirements)), - blueprint.pip_config, - ) - await loop.run_in_executor(self.thread_pool, self._loader.deploy_version, blueprint.sources) - async def _get_pip_config(self, environment: uuid.UUID, version: int) -> PipConfig: response = await self._client.get_pip_config(tid=environment, version=version) if response.code != 200: diff --git a/src/inmanta/agent/config.py b/src/inmanta/agent/config.py index 68713587ed..cd617e4b9a 100644 --- a/src/inmanta/agent/config.py +++ b/src/inmanta/agent/config.py @@ -46,7 +46,7 @@ "config", "use_autostart_agent_map", False, - """If this option is set to true, the agent-map of this agent will be set the the autostart_agent_map configured on the + """If this option is set to true, the agent-map of this agent will be set to the autostart_agent_map configured on the server. The agent_map will be kept up-to-date automatically.""", is_bool, ) diff --git a/src/inmanta/agent/in_process_executor.py b/src/inmanta/agent/in_process_executor.py index 372af2fb21..349186f2f6 100644 --- a/src/inmanta/agent/in_process_executor.py +++ b/src/inmanta/agent/in_process_executor.py @@ -17,23 +17,29 @@ import logging import typing import uuid +from asyncio import Lock +from collections import defaultdict from collections.abc import Sequence from concurrent.futures.thread import ThreadPoolExecutor from typing import Any, Optional +import pkg_resources + import inmanta.agent.cache import inmanta.protocol import inmanta.util import logfire -from inmanta import const, data +from inmanta import const, data, env from inmanta.agent import executor, handler from inmanta.agent.executor import FailedResourcesSet, ResourceDetails from inmanta.agent.handler import HandlerAPI, SkipResource from inmanta.agent.io.remote import ChannelClosedException from inmanta.const import ParameterSource from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr +from inmanta.loader import CodeLoader from inmanta.resources import Id, Resource from inmanta.types import Apireturn +from inmanta.util import NamedLock if typing.TYPE_CHECKING: import inmanta.agent.agent as agent @@ -446,6 +452,9 @@ def __init__( eventloop: asyncio.AbstractEventLoop, parent_logger: logging.Logger, process: "agent.Agent", + code_dir: str, + env_dir: str, + code_loader: bool = True, ) -> None: self.environment = environment self.client = client @@ -456,6 +465,20 @@ def __init__( self.executors: dict[str, InProcessExecutor] = {} self._creation_locks: inmanta.util.NamedLock = inmanta.util.NamedLock() + self._loader: CodeLoader | None = None + self._env: env.VirtualEnv | None = None + + if code_loader: + self._env = env.VirtualEnv(env_dir) + self._env.use_virtual_env() + self._loader = CodeLoader(code_dir, clean=True) + # Lock to ensure only one actual install runs at a time + self._loader_lock: asyncio.Lock = Lock() + # Keep track for each resource type of the last loaded version + self._last_loaded_version: dict[str, executor.ExecutorBlueprint | None] = defaultdict(lambda: None) + # Per-resource lock to serialize all actions per resource + self._resource_loader_lock: NamedLock = NamedLock() + async def stop(self) -> None: for child in self.executors.values(): child.stop() @@ -499,7 +522,68 @@ async def get_executor( out = InProcessExecutor(agent_name, agent_uri, self.environment, self.client, self.eventloop, self.logger) self.executors[agent_name] = out assert out.uri == agent_uri - failed_resource_types: FailedResourcesSet = await self.process.ensure_code(code) + failed_resource_types: FailedResourcesSet = await self.ensure_code(code) out.failed_resource_types = failed_resource_types return out + + async def ensure_code(self, code: typing.Collection[executor.ResourceInstallSpec]) -> executor.FailedResourcesSet: + """Ensure that the code for the given environment and version is loaded""" + + failed_to_load: executor.FailedResourcesSet = set() + + if self._loader is None: + return failed_to_load + + for resource_install_spec in code: + # only one logical thread can load a particular resource type at any time + async with self._resource_loader_lock.get(resource_install_spec.resource_type): + # stop if the last successful load was this one + # The combination of the lock and this check causes the reloads to naturally 'batch up' + if self._last_loaded_version[resource_install_spec.resource_type] == resource_install_spec.blueprint: + self.logger.debug( + "Handler code already installed for %s version=%d", + resource_install_spec.resource_type, + resource_install_spec.model_version, + ) + continue + + try: + # Install required python packages and the list of ``ModuleSource`` with the provided pip config + self.logger.debug( + "Installing handler %s version=%d", + resource_install_spec.resource_type, + resource_install_spec.model_version, + ) + await self._install(resource_install_spec.blueprint) + self.logger.debug( + "Installed handler %s version=%d", + resource_install_spec.resource_type, + resource_install_spec.model_version, + ) + + self._last_loaded_version[resource_install_spec.resource_type] = resource_install_spec.blueprint + except Exception: + self.logger.exception( + "Failed to install handler %s version=%d", + resource_install_spec.resource_type, + resource_install_spec.model_version, + ) + failed_to_load.add(resource_install_spec.resource_type) + self._last_loaded_version[resource_install_spec.resource_type] = None + + return failed_to_load + + async def _install(self, blueprint: executor.ExecutorBlueprint) -> None: + if self._env is None or self._loader is None: + raise Exception("Unable to load code when agent is started with code loading disabled.") + + async with self._loader_lock: + loop = asyncio.get_running_loop() + await loop.run_in_executor( + self.process.thread_pool, + self._env.install_for_config, + list(pkg_resources.parse_requirements(blueprint.requirements)), + blueprint.pip_config, + ) + await loop.run_in_executor(self.process.thread_pool, self._loader.deploy_version, blueprint.sources) diff --git a/tests/agent_server/test_agent_code_loading.py b/tests/agent_server/test_agent_code_loading.py index 50a282bb42..b93790d2ef 100644 --- a/tests/agent_server/test_agent_code_loading.py +++ b/tests/agent_server/test_agent_code_loading.py @@ -22,11 +22,13 @@ import py_compile import tempfile import uuid +from collections.abc import Sequence from logging import DEBUG, INFO import pytest import inmanta +from inmanta import config from inmanta.agent import Agent from inmanta.agent.executor import ResourceInstallSpec from inmanta.data import PipConfig @@ -71,8 +73,15 @@ async def make_source_structure( return hv +@pytest.fixture(scope="function") +def server_pre_start(server_config, request): + config.Config.set("agent", "executor-mode", request.param) + yield config + + +@pytest.mark.parametrize("server_pre_start", ["threaded"], indirect=True) async def test_agent_code_loading( - caplog, server, agent_factory, client, environment: uuid.UUID, monkeypatch, clienthelper + server_pre_start, caplog, server, agent_factory, client, environment: uuid.UUID, monkeypatch, clienthelper ) -> None: """ Test goals: @@ -181,19 +190,19 @@ async def get_version() -> int: resource_install_specs_1, _ = await agent.get_code( environment=environment, version=version_1, resource_types=["test::Test", "test::Test2", "test::Test3"] ) - await agent.ensure_code( + await agent.executor_manager.ensure_code( code=resource_install_specs_1, ) resource_install_specs_2, _ = await agent.get_code( environment=environment, version=version_1, resource_types=["test::Test", "test::Test2"] ) - await agent.ensure_code( + await agent.executor_manager.ensure_code( code=resource_install_specs_2, ) resource_install_specs_3, _ = await agent.get_code( environment=environment, version=version_2, resource_types=["test::Test2"] ) - await agent.ensure_code( + await agent.executor_manager.ensure_code( code=resource_install_specs_3, ) @@ -228,7 +237,7 @@ async def get_version() -> int: environment=environment, version=version_2, resource_types=["test::Test3"] ) # Install sources2 - await agent.ensure_code(code=resource_install_specs_4) + await agent.executor_manager.ensure_code(code=resource_install_specs_4) # Test 3 is deployed twice, as seen by the agent and the loader LogSequence(caplog).contains("inmanta.agent.agent", DEBUG, f"Installing handler test::Test3 version={version_1}") LogSequence(caplog).contains("inmanta.agent.agent", DEBUG, f"Installing handler test::Test3 version={version_2}") @@ -244,7 +253,7 @@ async def get_version() -> int: environment=environment, version=version_3, resource_types=["test::Test4"] ) # Loader loads byte code file - await agent.ensure_code(code=resource_install_specs_5) + await agent.executor_manager.ensure_code(code=resource_install_specs_5) LogSequence(caplog).contains("inmanta.agent.agent", DEBUG, f"Installing handler test::Test4 version={version_3}") LogSequence(caplog).contains("inmanta.loader", INFO, f"Deploying code (hv={hv3}, module=inmanta_plugins.tests)").assert_not( "inmanta.loader", INFO, f"Deploying code (hv={hv3}, module=inmanta_plugins.tests)" @@ -256,20 +265,22 @@ async def get_version() -> int: environment=environment, version=version_4, resource_types=["test::Test4"] ) # Now load the python only version again - await agent.ensure_code(code=resource_install_specs_6) + await agent.executor_manager.ensure_code(code=resource_install_specs_6) assert getattr(inmanta, "test_agent_code_loading") == 10 # ensure we clean up on restart - assert os.path.exists(os.path.join(agent._loader.mod_dir, "tests/plugins/__init__.py")) + assert os.path.exists(os.path.join(agent.executor_manager._loader.mod_dir, "tests/plugins/__init__.py")) await agent.stop() await agent_factory( environment=environment, agent_map={"agent1": "localhost"}, hostname="host", agent_names=["agent1"], code_loader=True ) - assert not os.path.exists(os.path.join(agent._loader.mod_dir, "tests")) + assert not os.path.exists(os.path.join(agent.executor_manager._loader.mod_dir, "tests")) @pytest.mark.slowtest +@pytest.mark.parametrize("server_pre_start", ["forking"], indirect=True) async def test_agent_installs_dependency_containing_extras( + server_pre_start, server, client, environment, @@ -318,7 +329,21 @@ def test(): version=version, resource_types=["test::Test"], ) - await agent.ensure_code(install_spec) + executor = await agent.executor_manager.get_executor("agent1", "localhost", install_spec) + + installed_packages = executor.executor_virtual_env.get_installed_packages() + + def check_packages(package_list: Sequence[str], must_contain: set[str], must_not_contain: set[str]) -> None: + """ + Iterate over and check: + - all elements from are present + - no element from are present + """ + for package in package_list: + assert package not in must_not_contain + if package in must_contain: + must_contain.remove(package) + + assert not must_contain - assert agent._env.are_installed(["pkg", "dep-a"]) - assert not agent._env.are_installed(["dep-b", "dep-c"]) + check_packages(package_list=installed_packages, must_contain={"pkg", "dep-a"}, must_not_contain={"dep-b", "dep-c"})