Skip to content

Commit

Permalink
Fixed bug that caused code loading problems not to be reported into t…
Browse files Browse the repository at this point in the history
…he resource action log. (Issue #8722, PR #8749)

# Description

* Fixed bug that caused code loading problems not to be reported into the resource action log.
* Fixed bug where executor processes would remain hanging if a code loading problem occurs.
* Fixed race condition in the `wait_until_deployment_finishes` method.

closes #8722

# Self Check:

- [x] Attached issue to pull request
- [x] Changelog entry
- [x] Type annotations are present
- [x] Code is clear and sufficiently documented
- [x] No (preventable) type errors (check using make mypy or make mypy-diff)
- [x] Sufficient test cases (reproduces the bug/tests the requested feature)
- [x] Correct, in line with design
- [ ] ~~End user documentation is included or an issue is created for end-user documentation~~
- [ ] ~~If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see [test-fixes](https://internal.inmanta.com/development/core/tasks/build-master.html#test-fixes) for more info)~~
  • Loading branch information
arnaudsjs authored and inmantaci committed Feb 17, 2025
1 parent 868959c commit 9b7b2cb
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 44 deletions.
8 changes: 8 additions & 0 deletions changelogs/unreleased/8722-report-code-loading-problems.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
description: Fixed bug that caused code loading problems not to be reported into the resource action log.
issue-nr: 8722
issue-repo: inmanta-core
change-type: patch
destination-branches: [master, iso8]
sections:
bugfix: "{{description}}"
58 changes: 32 additions & 26 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def my_name(self) -> str:
return f"Executor Process {self.name} for PID {self.process.pid}"

def render_id(self, member_id: executor.ExecutorId) -> str:
return f"Executor for {member_id.agent_name}"
return f"executor for {member_id.agent_name}"

def get_lock_name_for(self, member_id: executor.ExecutorId) -> str:
return member_id.identity()
Expand Down Expand Up @@ -886,7 +886,7 @@ def my_name(self) -> str:
return "Process pool"

def render_id(self, member: executor.ExecutorBlueprint) -> str:
return "Process for code hash: " + member.blueprint_hash()
return "process for code hash: " + member.blueprint_hash()

def get_lock_name_for(self, member_id: executor.ExecutorBlueprint) -> str:
return member_id.blueprint_hash()
Expand Down Expand Up @@ -923,32 +923,38 @@ def _id_to_internal(self, ext_id: executor.ExecutorBlueprint) -> executor.Execut
return ext_id

async def create_member(self, blueprint: executor.ExecutorBlueprint) -> MPProcess:

venv = await self.environment_manager.get_environment(blueprint.to_env_blueprint())
executor = await self.make_child_and_connect(blueprint, venv)
LOGGER.debug(
"Child forked (pid: %s) for %s",
executor.process.pid,
self.render_id(blueprint),
)
storage_for_blueprint = os.path.join(self.code_folder, blueprint.blueprint_hash())
os.makedirs(storage_for_blueprint, exist_ok=True)
failed_types = await executor.connection.call(
InitCommand(
venv.env_path,
storage_for_blueprint,
self.session_gid,
[x.for_transport() for x in blueprint.sources],
self.venv_checkup_interval,
try:
LOGGER.debug(
"Child forked (pid: %s) for %s",
executor.process.pid,
self.render_id(blueprint),
)
storage_for_blueprint = os.path.join(self.code_folder, blueprint.blueprint_hash())
os.makedirs(storage_for_blueprint, exist_ok=True)
failed_types = await executor.connection.call(
InitCommand(
venv.env_path,
storage_for_blueprint,
self.session_gid,
[x.for_transport() for x in blueprint.sources],
self.venv_checkup_interval,
)
)
LOGGER.debug(
"Child initialized (pid: %s) for %s",
executor.process.pid,
self.render_id(blueprint),
)
executor.failed_resource_results = failed_types
return executor
except Exception:
# Make sure to cleanup the executor process if its initialization fails.
await executor.request_shutdown()
raise Exception(
f"Failed to initialize scheduler process (pid: {executor.process.pid}) for {self.render_id(blueprint)}"
)
)
LOGGER.debug(
"Child initialized (pid: %s) for %s",
executor.process.pid,
self.render_id(blueprint),
)
executor.failed_resource_results = failed_types
return executor

async def make_child_and_connect(
self, executor_id: executor.ExecutorBlueprint, venv: executor.ExecutorVirtualEnvironment
Expand Down Expand Up @@ -1034,7 +1040,7 @@ def _id_to_internal(self, ext_id: executor.ExecutorId) -> executor.ExecutorId:
return ext_id

def render_id(self, member: executor.ExecutorId) -> str:
return f"Executor for {member.agent_name}"
return f"executor for {member.agent_name}"

def my_name(self) -> str:
return "Executor Manager"
Expand Down
3 changes: 2 additions & 1 deletion src/inmanta/agent/resourcepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def render_id(self, member: TPoolID) -> str:
return "PoolMember"

def member_name(self, member: TPoolMember) -> str:
"""Method to improve logging output by naming the members, best kept consitent with render_id"""
"""Method to improve logging output by naming the members, best kept consistent with render_id"""
return self.render_id(member.get_id())

def get_lock_name_for(self, member_id: TIntPoolID) -> str:
Expand Down Expand Up @@ -271,6 +271,7 @@ async def _create_or_replace(self, member_id: TPoolID, internal_id: TIntPoolID)

my_executor = await self.create_member(member_id)
assert my_executor.id == internal_id

self.pool[internal_id] = my_executor
my_executor.termination_listeners.append(self.notify_member_shutdown)

Expand Down
23 changes: 10 additions & 13 deletions src/inmanta/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def __check_dir(self, clean: bool = False) -> None:

def load_module(self, mod_name: str, hv: str) -> None:
"""
Ensure the given module is loaded.
Ensure the given module is loaded. Does not capture any import errors.
:param mod_name: Name of the module to load
:param hv: hash value of the content of the module
Expand All @@ -298,18 +298,15 @@ def load_module(self, mod_name: str, hv: str) -> None:

# Importing a module -> only the first import loads the code
# cache of loaded modules mechanism -> starts afresh when agent is restarted
try:
if mod_name in self.__modules:
if hv != self.__modules[mod_name][0]:
raise Exception(f"The content of module {mod_name} changed since it was last imported.")
LOGGER.debug("Module %s is already loaded", mod_name)
return
else:
mod = importlib.import_module(mod_name)
self.__modules[mod_name] = (hv, mod)
LOGGER.info("Loaded module %s", mod_name)
except ImportError:
LOGGER.exception("Unable to load module %s", mod_name)
if mod_name in self.__modules:
if hv != self.__modules[mod_name][0]:
raise Exception(f"The content of module {mod_name} changed since it was last imported.")
LOGGER.debug("Module %s is already loaded", mod_name)
return
else:
mod = importlib.import_module(mod_name)
self.__modules[mod_name] = (hv, mod)
LOGGER.info("Loaded module %s", mod_name)

def install_source(self, module_source: ModuleSource) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/inmanta/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ def ensure_event_loop() -> asyncio.AbstractEventLoop:
"""
try:
# nothing needs to be done if this thread already has an event loop
return asyncio.get_event_loop()
return asyncio.get_running_loop()
except RuntimeError:
# asyncio.set_event_loop sets the event loop for this thread only
new_loop = asyncio.new_event_loop()
Expand Down
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,21 @@
environment.
"""

"""
About the fixtures that control the behavior related to the scheduler:
* Default behavior: We expect the test case to not (auto)start a scheduler. Any attempt to autostart the scheduler
will result in an AssertionError.
* `auto_start_agent` fixture: Indicates we expect scheduler to be autostart.
=> Usage: Add the `@pytest.mark.parametrize("auto_start_agent", [True])` annotation on the test case.
* `null_agent` fixture: Create an agent that doesn't do anything. The test case just expects the agent to exists and be up.
=> Usage: Regular fixture instantiation.
* `agent` fixture: Create an full in-process agent that fully works.
=> Usage: Regular fixture instantiation
* `no_agent` fixture: Disables the scheduler autostart functionality, any attempt to start the scheduler is ignored.
=> Usage: Add the `@pytest.mark.parametrize("no_agent", [True])` annotation on the test case.
"""

import asyncio
import concurrent
import csv
Expand Down
55 changes: 54 additions & 1 deletion tests/deploy/e2e/test_code_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from inmanta.server import SLICE_AGENT_MANAGER
from inmanta.server.server import Server
from inmanta.util import get_compiler_version
from utils import ClientHelper, DummyCodeManager, log_index, retry_limited
from utils import ClientHelper, DummyCodeManager, log_index, retry_limited, wait_until_deployment_finishes

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -282,3 +282,56 @@ async def _install(blueprint: executor.ExecutorBlueprint) -> None:
log_index(caplog, "test_code_loading", logging.ERROR, "Failed to install handler test::Test version=1", idx1)

log_index(caplog, "test_code_loading", logging.ERROR, "Failed to install handler test::Test2 version=1", idx1)


@pytest.mark.parametrize("auto_start_agent", [True])
async def test_logging_on_code_loading_failure(server, client, environment, clienthelper):
"""
This test case ensures that if handler code cannot be loaded, this is reported in the resource action log.
"""
code = """
raise Exception("Fail code loading")
"""

sources = {}
await make_source_structure(
sources,
"inmanta_plugins/test/__init__.py",
"inmanta_plugins.test",
code,
dependencies=[],
client=client,
)

version = await clienthelper.get_version()

res = await client.put_version(
tid=environment,
version=version,
resources=[
{
"id": "test::Test[agent,name=test],v=%d" % version,
"purged": False,
"requires": [],
"version": version,
}
],
compiler_version=get_compiler_version(),
)
assert res.code == 200

res = await client.upload_code_batched(tid=environment, id=version, resources={"test::Test": sources})
assert res.code == 200

res = await client.release_version(tid=environment, id=version)
assert res.code == 200

await wait_until_deployment_finishes(client, environment, version=version)

result = await client.get_resource_actions(tid=environment, resource_type="test::Test", agent="agent", log_severity="ERROR")
assert result.code == 200
assert any(
"All resources of type `test::Test` failed to load handler code or install handler code dependencies" in log_line["msg"]
for resource_action in result.result["data"]
for log_line in resource_action["messages"]
)
2 changes: 1 addition & 1 deletion tests/forking_agent/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def check_automatic_clean_up() -> bool:
caplog,
"inmanta.agent.resourcepool",
logging.DEBUG,
("Executor for agent2 will be shutdown because it was inactive for "),
("executor for agent2 will be shutdown because it was inactive for "),
)

# We can get `Caught subprocess termination from unknown pid: %d -> %d`
Expand Down
6 changes: 5 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ async def done() -> bool:

if version >= 0:
scheduler = await data.Scheduler.get_one(environment=environment)
if scheduler.last_processed_model_version is None or scheduler.last_processed_model_version < version:
if (
scheduler is None
or scheduler.last_processed_model_version is None
or scheduler.last_processed_model_version < version
):
return False

result = await client.resource_list(environment, deploy_summary=True)
Expand Down

0 comments on commit 9b7b2cb

Please sign in to comment.