Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failed state and test second deploy #8090

Closed
wants to merge 13 commits into from
4 changes: 4 additions & 0 deletions changelogs/unreleased/8010-agent-scheduler_2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
description: Add testcases for agent and server
issue-nr: 8010
change-type: patch
destination-branches: [master]
3 changes: 2 additions & 1 deletion src/inmanta/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from inmanta import const
from inmanta.agent import config as cfg
from inmanta.agent import resourcepool
from inmanta.const import ResourceState
from inmanta.data.model import PipConfig, ResourceIdStr, ResourceType, ResourceVersionIdStr
from inmanta.env import PythonEnvironment
from inmanta.loader import ModuleSource
Expand Down Expand Up @@ -486,7 +487,7 @@ async def execute(
gid: uuid.UUID,
resource_details: ResourceDetails,
reason: str,
) -> None:
) -> ResourceState:
"""
Perform the actual deployment of the resource by calling the loaded handler code

Expand Down
10 changes: 5 additions & 5 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ async def call(self, context: ExecutorContext) -> None:
await context.get(self.agent_name).dry_run(self.resources, self.dry_run_id)


class ExecuteCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, None]):
class ExecuteCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, const.ResourceState]):
"""Run a deploy in an executor"""

def __init__(
Expand All @@ -495,8 +495,8 @@ def __init__(
self.resource_details = resource_details
self.reason = reason

async def call(self, context: ExecutorContext) -> None:
await context.get(self.agent_name).execute(self.gid, self.resource_details, self.reason)
async def call(self, context: ExecutorContext) -> const.ResourceState:
return await context.get(self.agent_name).execute(self.gid, self.resource_details, self.reason)


class FactsCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, inmanta.types.Apireturn]):
Expand Down Expand Up @@ -803,8 +803,8 @@ async def execute(
gid: uuid.UUID,
resource_details: "inmanta.agent.executor.ResourceDetails",
reason: str,
) -> None:
await self.call(ExecuteCommand(self.id.agent_name, gid, resource_details, reason))
) -> const.ResourceState:
return await self.call(ExecuteCommand(self.id.agent_name, gid, resource_details, reason))

async def get_facts(self, resource: "inmanta.agent.executor.ResourceDetails") -> inmanta.types.Apireturn:
return await self.call(FactsCommand(self.id.agent_name, resource))
Expand Down
13 changes: 9 additions & 4 deletions src/inmanta/agent/in_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from inmanta.agent import executor, handler
from inmanta.agent.executor import FailedResources, ResourceDetails
from inmanta.agent.handler import HandlerAPI, SkipResource
from inmanta.const import ParameterSource
from inmanta.const import ParameterSource, ResourceState
from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr
from inmanta.loader import CodeLoader
from inmanta.resources import Id, Resource
Expand Down Expand Up @@ -257,11 +257,11 @@ async def execute(
gid: uuid.UUID,
resource_details: ResourceDetails,
reason: str,
) -> None:
) -> ResourceState:
try:
resource: Resource | None = await self.deserialize(resource_details, const.ResourceAction.deploy)
except Exception:
return
return const.ResourceState.unavailable
assert resource is not None
ctx = handler.HandlerContext(resource, logger=self.logger)
ctx.debug(
Expand All @@ -279,7 +279,7 @@ async def execute(
except Exception:
ctx.set_status(const.ResourceState.failed)
ctx.exception("Failed to report the start of the deployment to the server")
return
return const.ResourceState.failed

async with self.activity_lock:
with self._cache:
Expand All @@ -298,6 +298,11 @@ async def execute(
ctx.error("Failed to send facts to the server %s", set_fact_response.result)

await self._report_resource_deploy_done(resource_details, ctx)
# context should not be none at this point
if ctx.status is None:
ctx.error("Status not set, should not happen")
return const.ResourceState.failed
return ctx.status

async def dry_run(
self,
Expand Down
8 changes: 7 additions & 1 deletion src/inmanta/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from inmanta.export import cfg_env
from inmanta.logging import InmantaLoggerConfig, LoggerMode, LoggerModeManager, _is_on_tty
from inmanta.server.bootloader import InmantaBootloader
from inmanta.server.services.databaseservice import server_db_connect
from inmanta.signals import safe_shutdown, setup_signal_handlers
from inmanta.util import get_compiler_version
from inmanta.warnings import WarningsManager
Expand Down Expand Up @@ -158,11 +159,16 @@ def start_scheduler(options: argparse.Namespace) -> None:
AsyncHTTPClient.configure(None, max_clients=max_clients)

tracing.configure_logfire("agent_rs")

util.ensure_event_loop()
a = agent_new.Agent()

async def start() -> None:
await server_db_connect()
await a.start()

setup_signal_handlers(a.stop)
IOLoop.current().add_callback(a.start)
IOLoop.current().add_callback(start)
IOLoop.current().start()
LOGGER.info("Agent with Resource scheduler Shutdown complete")

Expand Down
130 changes: 12 additions & 118 deletions src/inmanta/deploy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@
"""

import asyncio
import datetime
import logging
import traceback
import uuid
from collections.abc import Mapping, Set
from typing import Any, Optional
from typing import Any

from inmanta import const, data, resources
from inmanta import data, resources
from inmanta.agent import executor
from inmanta.agent.code_manager import CodeManager
from inmanta.const import ResourceAction
from inmanta.data import Resource
from inmanta.data.model import ResourceIdStr
from inmanta.deploy import work
from inmanta.deploy.state import ModelState, ResourceDetails, ResourceStatus
from inmanta.deploy.work import PoisonPill
from inmanta.deploy.work import Task
from inmanta.protocol import Client

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,9 +110,7 @@ async def deploy(self) -> None:
"""
async with self._scheduler_lock:
# FIXME[#8008]: more efficient access to dirty set by caching it on the ModelState
dirty: Set[ResourceIdStr] = {
r for r, details in self._state.resource_state.items() if details.status == ResourceStatus.HAS_UPDATE
}
dirty: Set[ResourceIdStr] = {r for r, details in self._state.resource_state.items() if details.needs_deploy()}
# FIXME[#8008]: pass in running deploys
self._work.update_state(ensure_scheduled=dirty, running_deploys=set())

Expand Down Expand Up @@ -248,115 +243,14 @@ def start_for_agent(self, agent: str) -> None:
"""Start processing for the given agent"""
self._workers[agent] = asyncio.create_task(self._run_for_agent(agent))

async def _run_task(self, agent: str, task: work.Task, resource_details: ResourceDetails) -> None:
"""Run a task"""
match task:
case work.Deploy():
await self.perform_deploy(agent, resource_details)
case _:
print("Nothing here!")

async def perform_deploy(self, agent: str, resource_details: ResourceDetails) -> None:
"""
Perform an actual deploy on an agent.

:param agent:
:param resource_details:
"""
# FIXME: WDB to Sander: is the version of the state the correct version?
# It may happen that the set of types no longer matches the version?
# FIXME: code loading interface is not nice like this,
# - we may want to track modules per agent, instead of types
# - we may also want to track the module version vs the model version
# as it avoid the problem of fast chanfing model versions

async def report_deploy_failure(excn: Exception) -> None:
res_type = resource_details.id.entity_type
log_line = data.LogLine.log(
logging.ERROR,
"All resources of type `%(res_type)s` failed to load handler code or install handler code "
"dependencies: `%(error)s`\n%(traceback)s",
res_type=res_type,
error=str(excn),
traceback="".join(traceback.format_tb(excn.__traceback__)),
)
await self._client.resource_action_update(
tid=self._environment,
resource_ids=[resource_details.rvid],
action_id=uuid.uuid4(),
action=ResourceAction.deploy,
started=datetime.datetime.now().astimezone(),
finished=datetime.datetime.now().astimezone(),
messages=[log_line],
status=const.ResourceState.unavailable,
)

# Find code
code, invalid_resources = await self._code_manager.get_code(
environment=self._environment, version=self._state.version, resource_types=self._state.get_types_for_agent(agent)
)

# Bail out if this failed
if resource_details.id.entity_type in invalid_resources:
await report_deploy_failure(invalid_resources[resource_details.id.entity_type])
return

# Get executor
my_executor: executor.Executor = await self._executor_manager.get_executor(
agent_name=agent, agent_uri="NO_URI", code=code
)
failed_resources = my_executor.failed_resources

# Bail out if this failed
if resource_details.id.entity_type in failed_resources:
await report_deploy_failure(failed_resources[resource_details.id.entity_type])
return

# DEPLOY!!!
gid = uuid.uuid4()
# FIXME: reason argument is not used
await my_executor.execute(gid, resource_details, "New Scheduler initiated action")

async def _work_once(self, agent: str) -> None:
task: work.Task = await self._work.agent_queues.queue_get(agent)
# FIXME[#8008]: skip and reschedule deploy / refresh-fact task if resource marked as update pending?
if isinstance(task, PoisonPill):
# wake up and return, queue will be shut down
return
resource_details: ResourceDetails
async with self._scheduler_lock:
# fetch resource details atomically under lock
try:
resource_details = self._state.resources[task.resource]
except KeyError:
# Stale resource, can simply be dropped.
# May occur in rare races between new_version and acquiring the lock we're under here. This race is safe
# because of this check, and an intrinsic part of the locking design because it's preferred over wider
# locking for performance reasons.
return

await self._run_task(agent, task, resource_details)

# post-processing
match task:
case work.Deploy():
async with self._scheduler_lock:
# refresh resource details for latest model state
new_details: Optional[ResourceDetails] = self._state.resources.get(task.resource, None)
if new_details is not None and new_details.attribute_hash == resource_details.attribute_hash:
# FIXME[#8010]: pass success/failure to notify_provides()
# FIXME[#8008]: iff deploy was successful set resource status and deployment result
# in self.state.resources
self._work.notify_provides(task)
# The deploy that finished has become stale (state has changed since the deploy started).
# Nothing to report on a stale deploy.
# A new deploy for the current model state will have been queued already.
case _:
# nothing to do
pass
self._work.agent_queues.task_done(agent)

async def _run_for_agent(self, agent: str) -> None:
"""Main loop for one agent"""
while self._running:
await self._work_once(agent)
task: Task = await self._work.agent_queues.queue_get(agent)
try:
# FIXME[#8008]: skip and reschedule deploy / refresh-fact task if resource marked as update pending?
await task.execute(self, agent)
except Exception:
LOGGER.exception("Task %s for agent %s has failed and the exception was not properly handled", task, agent)

self._work.agent_queues.task_done(agent)
3 changes: 3 additions & 0 deletions src/inmanta/deploy/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ class ResourceState:
status: ResourceStatus
deployment_result: DeploymentResult

def needs_deploy(self) -> bool:
return self.status == ResourceStatus.HAS_UPDATE or self.deployment_result != DeploymentResult.DEPLOYED


@dataclass(kw_only=True)
class ModelState:
Expand Down
Loading