Skip to content

Commit

Permalink
Move agent code loader setup to InProcessExecutorManager (Issue #7589,…
Browse files Browse the repository at this point in the history
… PR #7867)

# Description

closes  #7589

# Self Check:

Strike through any lines that are not applicable (`~~line~~`) then check the box

- [x] Attached issue to pull request
- [x] Changelog entry
- [x] Type annotations are present
- [x] Code is clear and sufficiently documented
- [x] No (preventable) type errors (check using make mypy or make mypy-diff)
- [ ] Sufficient test cases (reproduces the bug/tests the requested feature)
~~- [ ] Correct, in line with design~~
~~- [ ] End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )~~
~~- [ ] If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see [test-fixes](https://internal.inmanta.com/development/core/tasks/build-master.html#test-fixes) for more info)~~
  • Loading branch information
Hugo-Inmanta authored and inmantaci committed Aug 1, 2024
1 parent 0644047 commit f04760e
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
description: Move agent code loader setup to InProcessExecutorManager
change-type: patch
issue-nr: 7589
destination-branches: [master]
112 changes: 16 additions & 96 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@
import random
import time
import uuid
from asyncio import Lock
from collections import defaultdict
from collections.abc import Callable, Coroutine, Iterable, Mapping, Sequence
from concurrent.futures.thread import ThreadPoolExecutor
from logging import Logger
from typing import Any, Collection, Dict, Optional, Union, cast

import pkg_resources

import inmanta.agent.executor
from inmanta import config, const, data, env, protocol
from inmanta import config, const, data, protocol
from inmanta.agent import config as cfg
from inmanta.agent import executor, forking_executor, in_process_executor
from inmanta.agent.executor import ResourceDetails, ResourceInstallSpec
Expand All @@ -49,20 +45,11 @@
ResourceType,
ResourceVersionIdStr,
)
from inmanta.loader import CodeLoader, ModuleSource
from inmanta.loader import ModuleSource
from inmanta.protocol import SessionEndpoint, SyncClient, methods, methods_v2
from inmanta.resources import Id
from inmanta.types import Apireturn, JsonType
from inmanta.util import (
CronSchedule,
IntervalSchedule,
NamedLock,
ScheduledTask,
TaskMethod,
TaskSchedule,
add_future,
join_threadpools,
)
from inmanta.util import CronSchedule, IntervalSchedule, ScheduledTask, TaskMethod, TaskSchedule, add_future, join_threadpools

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -972,27 +959,18 @@ def __init__(
self._instances: dict[str, AgentInstance] = {}
self._instances_lock = asyncio.Lock()

self._loader: Optional[CodeLoader] = None
self._env: Optional[env.VirtualEnv] = None
if code_loader:
# all of this should go into the executor manager https://github.com/inmanta/inmanta-core/issues/7589
self._env = env.VirtualEnv(self._storage["env"])
self._env.use_virtual_env()
self._loader = CodeLoader(self._storage["code"], clean=True)
# Lock to ensure only one actual install runs at a time
self._loader_lock = Lock()
# Keep track for each resource type of the last loaded version
self._last_loaded_version: dict[str, executor.ExecutorBlueprint | None] = defaultdict(lambda: None)
# Cache to prevent re-fetching the same resource-version
self._previously_loaded: dict[tuple[str, int], ResourceInstallSpec] = {}
# Per-resource lock to serialize all actions per resource
self._resource_loader_lock = NamedLock()
# Cache to prevent re-fetching the same resource-version
self._code_cache: dict[tuple[str, int], ResourceInstallSpec] = {}

self.agent_map: Optional[dict[str, str]] = agent_map

remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExecutorMode.forking
can_have_remote_executor = code_loader

# Mechanism to speed up tests using the old (<= iso7) agent mechanism
# by avoiding spawning a virtual environment.
self._code_loader = code_loader

self.executor_manager: executor.ExecutorManager[executor.Executor]
if remote_executor and can_have_remote_executor:
LOGGER.info("Selected forking agent executor mode")
Expand All @@ -1016,6 +994,9 @@ def __init__(
asyncio.get_event_loop(),
LOGGER,
self,
self._storage["code"],
self._storage["env"],
code_loader,
)

async def _init_agent_map(self) -> None:
Expand Down Expand Up @@ -1225,7 +1206,7 @@ async def get_code(
- collection of ResourceInstallSpec for resource_types with valid handler code and pip config
- set of invalid resource_types (no handler code and/or invalid pip config)
"""
if self._loader is None:
if not self._code_loader:
return [], set()

# store it outside the loop, but only load when required
Expand All @@ -1234,7 +1215,7 @@ async def get_code(
resource_install_specs: list[ResourceInstallSpec] = []
invalid_resource_types: executor.FailedResourcesSet = set()
for resource_type in set(resource_types):
cached_spec: Optional[ResourceInstallSpec] = self._previously_loaded.get((resource_type, version))
cached_spec: Optional[ResourceInstallSpec] = self._code_cache.get((resource_type, version))
if cached_spec:
LOGGER.debug(
"Cache hit, using existing ResourceInstallSpec for resource_type=%s version=%d", resource_type, version
Expand Down Expand Up @@ -1270,10 +1251,10 @@ async def get_code(
resource_type, version, executor.ExecutorBlueprint(pip_config, list(requirements), sources)
)
resource_install_specs.append(resource_install_spec)
# Update the ``_previously_loaded`` cache to indicate that the given resource type's ResourceInstallSpec
# Update the ``_code_cache`` cache to indicate that the given resource type's ResourceInstallSpec
# was constructed successfully at the specified version.
# TODO: this cache is a slight memory leak
self._previously_loaded[(resource_type, version)] = resource_install_spec
self._code_cache[(resource_type, version)] = resource_install_spec
else:
LOGGER.error(
"Failed to get source code for %s version=%d\n%s",
Expand All @@ -1285,67 +1266,6 @@ async def get_code(

return resource_install_specs, invalid_resource_types

async def ensure_code(self, code: Collection[ResourceInstallSpec]) -> executor.FailedResourcesSet:
"""Ensure that the code for the given environment and version is loaded"""

failed_to_load: executor.FailedResourcesSet = set()

if self._loader is None:
return failed_to_load

for resource_install_spec in code:
# only one logical thread can load a particular resource type at any time
async with self._resource_loader_lock.get(resource_install_spec.resource_type):
# stop if the last successful load was this one
# The combination of the lock and this check causes the reloads to naturally 'batch up'
if self._last_loaded_version[resource_install_spec.resource_type] == resource_install_spec.blueprint:
LOGGER.debug(
"Handler code already installed for %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)
continue

try:
# Install required python packages and the list of ``ModuleSource`` with the provided pip config
LOGGER.debug(
"Installing handler %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)
await self._install(resource_install_spec.blueprint)
LOGGER.debug(
"Installed handler %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)

self._last_loaded_version[resource_install_spec.resource_type] = resource_install_spec.blueprint
except Exception:
LOGGER.exception(
"Failed to install handler %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)
failed_to_load.add(resource_install_spec.resource_type)
self._last_loaded_version[resource_install_spec.resource_type] = None

return failed_to_load

async def _install(self, blueprint: executor.ExecutorBlueprint) -> None:
if self._env is None or self._loader is None:
raise Exception("Unable to load code when agent is started with code loading disabled.")

async with self._loader_lock:
loop = asyncio.get_running_loop()
await loop.run_in_executor(
self.thread_pool,
self._env.install_for_config,
list(pkg_resources.parse_requirements(blueprint.requirements)),
blueprint.pip_config,
)
await loop.run_in_executor(self.thread_pool, self._loader.deploy_version, blueprint.sources)

async def _get_pip_config(self, environment: uuid.UUID, version: int) -> PipConfig:
response = await self._client.get_pip_config(tid=environment, version=version)
if response.code != 200:
Expand Down
2 changes: 1 addition & 1 deletion src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"config",
"use_autostart_agent_map",
False,
"""If this option is set to true, the agent-map of this agent will be set the the autostart_agent_map configured on the
"""If this option is set to true, the agent-map of this agent will be set to the autostart_agent_map configured on the
server. The agent_map will be kept up-to-date automatically.""",
is_bool,
)
Expand Down
88 changes: 86 additions & 2 deletions src/inmanta/agent/in_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,29 @@
import logging
import typing
import uuid
from asyncio import Lock
from collections import defaultdict
from collections.abc import Sequence
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Any, Optional

import pkg_resources

import inmanta.agent.cache
import inmanta.protocol
import inmanta.util
import logfire
from inmanta import const, data
from inmanta import const, data, env
from inmanta.agent import executor, handler
from inmanta.agent.executor import FailedResourcesSet, ResourceDetails
from inmanta.agent.handler import HandlerAPI, SkipResource
from inmanta.agent.io.remote import ChannelClosedException
from inmanta.const import ParameterSource
from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr
from inmanta.loader import CodeLoader
from inmanta.resources import Id, Resource
from inmanta.types import Apireturn
from inmanta.util import NamedLock

if typing.TYPE_CHECKING:
import inmanta.agent.agent as agent
Expand Down Expand Up @@ -446,6 +452,9 @@ def __init__(
eventloop: asyncio.AbstractEventLoop,
parent_logger: logging.Logger,
process: "agent.Agent",
code_dir: str,
env_dir: str,
code_loader: bool = True,
) -> None:
self.environment = environment
self.client = client
Expand All @@ -456,6 +465,20 @@ def __init__(
self.executors: dict[str, InProcessExecutor] = {}
self._creation_locks: inmanta.util.NamedLock = inmanta.util.NamedLock()

self._loader: CodeLoader | None = None
self._env: env.VirtualEnv | None = None

if code_loader:
self._env = env.VirtualEnv(env_dir)
self._env.use_virtual_env()
self._loader = CodeLoader(code_dir, clean=True)
# Lock to ensure only one actual install runs at a time
self._loader_lock: asyncio.Lock = Lock()
# Keep track for each resource type of the last loaded version
self._last_loaded_version: dict[str, executor.ExecutorBlueprint | None] = defaultdict(lambda: None)
# Per-resource lock to serialize all actions per resource
self._resource_loader_lock: NamedLock = NamedLock()

async def stop(self) -> None:
for child in self.executors.values():
child.stop()
Expand Down Expand Up @@ -499,7 +522,68 @@ async def get_executor(
out = InProcessExecutor(agent_name, agent_uri, self.environment, self.client, self.eventloop, self.logger)
self.executors[agent_name] = out
assert out.uri == agent_uri
failed_resource_types: FailedResourcesSet = await self.process.ensure_code(code)
failed_resource_types: FailedResourcesSet = await self.ensure_code(code)
out.failed_resource_types = failed_resource_types

return out

async def ensure_code(self, code: typing.Collection[executor.ResourceInstallSpec]) -> executor.FailedResourcesSet:
"""Ensure that the code for the given environment and version is loaded"""

failed_to_load: executor.FailedResourcesSet = set()

if self._loader is None:
return failed_to_load

for resource_install_spec in code:
# only one logical thread can load a particular resource type at any time
async with self._resource_loader_lock.get(resource_install_spec.resource_type):
# stop if the last successful load was this one
# The combination of the lock and this check causes the reloads to naturally 'batch up'
if self._last_loaded_version[resource_install_spec.resource_type] == resource_install_spec.blueprint:
self.logger.debug(
"Handler code already installed for %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)
continue

try:
# Install required python packages and the list of ``ModuleSource`` with the provided pip config
self.logger.debug(
"Installing handler %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)
await self._install(resource_install_spec.blueprint)
self.logger.debug(
"Installed handler %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)

self._last_loaded_version[resource_install_spec.resource_type] = resource_install_spec.blueprint
except Exception:
self.logger.exception(
"Failed to install handler %s version=%d",
resource_install_spec.resource_type,
resource_install_spec.model_version,
)
failed_to_load.add(resource_install_spec.resource_type)
self._last_loaded_version[resource_install_spec.resource_type] = None

return failed_to_load

async def _install(self, blueprint: executor.ExecutorBlueprint) -> None:
if self._env is None or self._loader is None:
raise Exception("Unable to load code when agent is started with code loading disabled.")

async with self._loader_lock:
loop = asyncio.get_running_loop()
await loop.run_in_executor(
self.process.thread_pool,
self._env.install_for_config,
list(pkg_resources.parse_requirements(blueprint.requirements)),
blueprint.pip_config,
)
await loop.run_in_executor(self.process.thread_pool, self._loader.deploy_version, blueprint.sources)
Loading

0 comments on commit f04760e

Please sign in to comment.