diff --git a/changelogs/unreleased/8010-agent-scheduler_2.yml b/changelogs/unreleased/8010-agent-scheduler_2.yml new file mode 100644 index 0000000000..613aa54d20 --- /dev/null +++ b/changelogs/unreleased/8010-agent-scheduler_2.yml @@ -0,0 +1,4 @@ +description: Add testcases for agent and server +issue-nr: 8010 +change-type: patch +destination-branches: [master] diff --git a/src/inmanta/agent/executor.py b/src/inmanta/agent/executor.py index 15746a14a9..99a172caf2 100644 --- a/src/inmanta/agent/executor.py +++ b/src/inmanta/agent/executor.py @@ -40,6 +40,7 @@ from inmanta import const from inmanta.agent import config as cfg from inmanta.agent import resourcepool +from inmanta.const import ResourceState from inmanta.data.model import PipConfig, ResourceIdStr, ResourceType, ResourceVersionIdStr from inmanta.env import PythonEnvironment from inmanta.loader import ModuleSource @@ -486,7 +487,7 @@ async def execute( gid: uuid.UUID, resource_details: ResourceDetails, reason: str, - ) -> None: + ) -> ResourceState: """ Perform the actual deployment of the resource by calling the loaded handler code diff --git a/src/inmanta/agent/forking_executor.py b/src/inmanta/agent/forking_executor.py index b0e5313765..88c72cfbe4 100644 --- a/src/inmanta/agent/forking_executor.py +++ b/src/inmanta/agent/forking_executor.py @@ -480,7 +480,7 @@ async def call(self, context: ExecutorContext) -> None: await context.get(self.agent_name).dry_run(self.resources, self.dry_run_id) -class ExecuteCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, None]): +class ExecuteCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, const.ResourceState]): """Run a deploy in an executor""" def __init__( @@ -495,8 +495,8 @@ def __init__( self.resource_details = resource_details self.reason = reason - async def call(self, context: ExecutorContext) -> None: - await context.get(self.agent_name).execute(self.gid, self.resource_details, self.reason) + async def call(self, context: ExecutorContext) -> const.ResourceState: + return await context.get(self.agent_name).execute(self.gid, self.resource_details, self.reason) class FactsCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, inmanta.types.Apireturn]): @@ -803,8 +803,8 @@ async def execute( gid: uuid.UUID, resource_details: "inmanta.agent.executor.ResourceDetails", reason: str, - ) -> None: - await self.call(ExecuteCommand(self.id.agent_name, gid, resource_details, reason)) + ) -> const.ResourceState: + return await self.call(ExecuteCommand(self.id.agent_name, gid, resource_details, reason)) async def get_facts(self, resource: "inmanta.agent.executor.ResourceDetails") -> inmanta.types.Apireturn: return await self.call(FactsCommand(self.id.agent_name, resource)) diff --git a/src/inmanta/agent/in_process_executor.py b/src/inmanta/agent/in_process_executor.py index 9049e0ef96..52fc4af796 100644 --- a/src/inmanta/agent/in_process_executor.py +++ b/src/inmanta/agent/in_process_executor.py @@ -32,7 +32,7 @@ from inmanta.agent import executor, handler from inmanta.agent.executor import FailedResources, ResourceDetails from inmanta.agent.handler import HandlerAPI, SkipResource -from inmanta.const import ParameterSource +from inmanta.const import ParameterSource, ResourceState from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr from inmanta.loader import CodeLoader from inmanta.resources import Id, Resource @@ -257,11 +257,11 @@ async def execute( gid: uuid.UUID, resource_details: ResourceDetails, reason: str, - ) -> None: + ) -> ResourceState: try: resource: Resource | None = await self.deserialize(resource_details, const.ResourceAction.deploy) except Exception: - return + return const.ResourceState.unavailable assert resource is not None ctx = handler.HandlerContext(resource, logger=self.logger) ctx.debug( @@ -279,7 +279,7 @@ async def execute( except Exception: ctx.set_status(const.ResourceState.failed) ctx.exception("Failed to report the start of the deployment to the server") - return + return const.ResourceState.failed async with self.activity_lock: with self._cache: @@ -298,6 +298,11 @@ async def execute( ctx.error("Failed to send facts to the server %s", set_fact_response.result) await self._report_resource_deploy_done(resource_details, ctx) + # context should not be none at this point + if ctx.status is None: + ctx.error("Status not set, should not happen") + return const.ResourceState.failed + return ctx.status async def dry_run( self, diff --git a/src/inmanta/app.py b/src/inmanta/app.py index b91254741e..c5ad50db2f 100644 --- a/src/inmanta/app.py +++ b/src/inmanta/app.py @@ -64,6 +64,7 @@ from inmanta.export import cfg_env from inmanta.logging import InmantaLoggerConfig, LoggerMode, LoggerModeManager, _is_on_tty from inmanta.server.bootloader import InmantaBootloader +from inmanta.server.services.databaseservice import server_db_connect from inmanta.signals import safe_shutdown, setup_signal_handlers from inmanta.util import get_compiler_version from inmanta.warnings import WarningsManager @@ -158,11 +159,16 @@ def start_scheduler(options: argparse.Namespace) -> None: AsyncHTTPClient.configure(None, max_clients=max_clients) tracing.configure_logfire("agent_rs") + util.ensure_event_loop() a = agent_new.Agent() + async def start() -> None: + await server_db_connect() + await a.start() + setup_signal_handlers(a.stop) - IOLoop.current().add_callback(a.start) + IOLoop.current().add_callback(start) IOLoop.current().start() LOGGER.info("Agent with Resource scheduler Shutdown complete") diff --git a/src/inmanta/deploy/scheduler.py b/src/inmanta/deploy/scheduler.py index 5395719f63..d4b4ffafc2 100644 --- a/src/inmanta/deploy/scheduler.py +++ b/src/inmanta/deploy/scheduler.py @@ -17,22 +17,19 @@ """ import asyncio -import datetime import logging -import traceback import uuid from collections.abc import Mapping, Set -from typing import Any, Optional +from typing import Any -from inmanta import const, data, resources +from inmanta import data, resources from inmanta.agent import executor from inmanta.agent.code_manager import CodeManager -from inmanta.const import ResourceAction from inmanta.data import Resource from inmanta.data.model import ResourceIdStr from inmanta.deploy import work from inmanta.deploy.state import ModelState, ResourceDetails, ResourceStatus -from inmanta.deploy.work import PoisonPill +from inmanta.deploy.tasks import Task from inmanta.protocol import Client LOGGER = logging.getLogger(__name__) @@ -113,9 +110,7 @@ async def deploy(self) -> None: """ async with self._scheduler_lock: # FIXME[#8008]: more efficient access to dirty set by caching it on the ModelState - dirty: Set[ResourceIdStr] = { - r for r, details in self._state.resource_state.items() if details.status == ResourceStatus.HAS_UPDATE - } + dirty: Set[ResourceIdStr] = {r for r, details in self._state.resource_state.items() if details.needs_deploy()} # FIXME[#8008]: pass in running deploys self._work.update_state(ensure_scheduled=dirty, running_deploys=set()) @@ -248,115 +243,14 @@ def start_for_agent(self, agent: str) -> None: """Start processing for the given agent""" self._workers[agent] = asyncio.create_task(self._run_for_agent(agent)) - async def _run_task(self, agent: str, task: work.Task, resource_details: ResourceDetails) -> None: - """Run a task""" - match task: - case work.Deploy(): - await self.perform_deploy(agent, resource_details) - case _: - print("Nothing here!") - - async def perform_deploy(self, agent: str, resource_details: ResourceDetails) -> None: - """ - Perform an actual deploy on an agent. - - :param agent: - :param resource_details: - """ - # FIXME: WDB to Sander: is the version of the state the correct version? - # It may happen that the set of types no longer matches the version? - # FIXME: code loading interface is not nice like this, - # - we may want to track modules per agent, instead of types - # - we may also want to track the module version vs the model version - # as it avoid the problem of fast chanfing model versions - - async def report_deploy_failure(excn: Exception) -> None: - res_type = resource_details.id.entity_type - log_line = data.LogLine.log( - logging.ERROR, - "All resources of type `%(res_type)s` failed to load handler code or install handler code " - "dependencies: `%(error)s`\n%(traceback)s", - res_type=res_type, - error=str(excn), - traceback="".join(traceback.format_tb(excn.__traceback__)), - ) - await self._client.resource_action_update( - tid=self._environment, - resource_ids=[resource_details.rvid], - action_id=uuid.uuid4(), - action=ResourceAction.deploy, - started=datetime.datetime.now().astimezone(), - finished=datetime.datetime.now().astimezone(), - messages=[log_line], - status=const.ResourceState.unavailable, - ) - - # Find code - code, invalid_resources = await self._code_manager.get_code( - environment=self._environment, version=self._state.version, resource_types=self._state.get_types_for_agent(agent) - ) - - # Bail out if this failed - if resource_details.id.entity_type in invalid_resources: - await report_deploy_failure(invalid_resources[resource_details.id.entity_type]) - return - - # Get executor - my_executor: executor.Executor = await self._executor_manager.get_executor( - agent_name=agent, agent_uri="NO_URI", code=code - ) - failed_resources = my_executor.failed_resources - - # Bail out if this failed - if resource_details.id.entity_type in failed_resources: - await report_deploy_failure(failed_resources[resource_details.id.entity_type]) - return - - # DEPLOY!!! - gid = uuid.uuid4() - # FIXME: reason argument is not used - await my_executor.execute(gid, resource_details, "New Scheduler initiated action") - - async def _work_once(self, agent: str) -> None: - task: work.Task = await self._work.agent_queues.queue_get(agent) - # FIXME[#8008]: skip and reschedule deploy / refresh-fact task if resource marked as update pending? - if isinstance(task, PoisonPill): - # wake up and return, queue will be shut down - return - resource_details: ResourceDetails - async with self._scheduler_lock: - # fetch resource details atomically under lock - try: - resource_details = self._state.resources[task.resource] - except KeyError: - # Stale resource, can simply be dropped. - # May occur in rare races between new_version and acquiring the lock we're under here. This race is safe - # because of this check, and an intrinsic part of the locking design because it's preferred over wider - # locking for performance reasons. - return - - await self._run_task(agent, task, resource_details) - - # post-processing - match task: - case work.Deploy(): - async with self._scheduler_lock: - # refresh resource details for latest model state - new_details: Optional[ResourceDetails] = self._state.resources.get(task.resource, None) - if new_details is not None and new_details.attribute_hash == resource_details.attribute_hash: - # FIXME[#8010]: pass success/failure to notify_provides() - # FIXME[#8008]: iff deploy was successful set resource status and deployment result - # in self.state.resources - self._work.notify_provides(task) - # The deploy that finished has become stale (state has changed since the deploy started). - # Nothing to report on a stale deploy. - # A new deploy for the current model state will have been queued already. - case _: - # nothing to do - pass - self._work.agent_queues.task_done(agent) - async def _run_for_agent(self, agent: str) -> None: """Main loop for one agent""" while self._running: - await self._work_once(agent) + task: Task = await self._work.agent_queues.queue_get(agent) + try: + # FIXME[#8008]: skip and reschedule deploy / refresh-fact task if resource marked as update pending? + await task.execute(self, agent) + except Exception: + LOGGER.exception("Task %s for agent %s has failed and the exception was not properly handled", task, agent) + + self._work.agent_queues.task_done(agent) diff --git a/src/inmanta/deploy/state.py b/src/inmanta/deploy/state.py index e0ce97f437..f33dc6b311 100644 --- a/src/inmanta/deploy/state.py +++ b/src/inmanta/deploy/state.py @@ -95,6 +95,9 @@ class ResourceState: status: ResourceStatus deployment_result: DeploymentResult + def needs_deploy(self) -> bool: + return self.status == ResourceStatus.HAS_UPDATE or self.deployment_result != DeploymentResult.DEPLOYED + @dataclass(kw_only=True) class ModelState: diff --git a/src/inmanta/deploy/tasks.py b/src/inmanta/deploy/tasks.py new file mode 100644 index 0000000000..d809ddd129 --- /dev/null +++ b/src/inmanta/deploy/tasks.py @@ -0,0 +1,179 @@ +""" + Copyright 2024 Inmanta + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contact: code@inmanta.com +""" + +import abc +import datetime +import logging +import traceback +import uuid +from dataclasses import dataclass +from typing import Optional + +from inmanta import const, data +from inmanta.agent import executor +from inmanta.data.model import ResourceIdStr +from inmanta.deploy import scheduler, state + + +@dataclass(frozen=True, kw_only=True) +class Task(abc.ABC): + """ + Resource action task. Represents the execution of a specific resource action for a given resource. + """ + + resource: ResourceIdStr + + @abc.abstractmethod + async def execute(self, scheduler: "scheduler.ResourceScheduler", agent: str) -> None: + """the scheduler is considered to be a friend class: access to internal members is expected""" + pass + + def delete_with_resource(self) -> bool: + return True + + +class PoisonPill(Task): + """ + Task to signal queue shutdown + + It is used to make sure all workers wake up to observe that they have been closed. + It functions mostly as a no-op + """ + + async def execute(self, scheduler: "scheduler.ResourceScheduler", agent: str) -> None: + pass + + +class WithHashMatchTask(Task): + + async def execute(self, scheduler: "scheduler.ResourceScheduler", agent: str) -> None: + resource_details: "state.ResourceDetails" + async with scheduler._scheduler_lock: + # fetch resource details atomically under lock + try: + resource_details = scheduler._state.resources[self.resource] + except KeyError: + # Stale resource, can simply be dropped. + # May occur in rare races between new_version and acquiring the lock we're under here. This race is safe + # because of this check, and an intrinsic part of the locking design because it's preferred over wider + # locking for performance reasons. + return + await self.execute_on_resource(scheduler, agent, resource_details) + + @abc.abstractmethod + async def execute_on_resource( + self, scheduler: "scheduler.ResourceScheduler", agent: str, resource_details: "state.ResourceDetails" + ) -> None: + pass + + +class Deploy(WithHashMatchTask): + async def execute_on_resource( + self, scheduler: "scheduler.ResourceScheduler", agent: str, resource_details: "state.ResourceDetails" + ) -> None: + status = await self.do_deploy(scheduler, agent, resource_details) + + is_success = status == const.ResourceState.deployed + + async with scheduler._scheduler_lock: + # refresh resource details for latest model state + new_details: Optional[state.ResourceDetails] = scheduler._state.resources.get(self.resource, None) + my_state: state.ResourceState | None = scheduler._state.resource_state.get(self.resource, None) + if new_details is not None and new_details.attribute_hash == resource_details.attribute_hash: + assert my_state is not None + if is_success: + my_state.status = state.ResourceStatus.UP_TO_DATE + my_state.deployment_result = state.DeploymentResult.DEPLOYED + else: + # FIXME[#8008]: WDB to Sander: do we set status here as well? + my_state.deployment_result = state.DeploymentResult.FAILED + scheduler._work.notify_provides(self) + # The deploy that finished has become stale (state has changed since the deploy started). + # Nothing to report on a stale deploy. + # A new deploy for the current model state will have been queued already. + + async def do_deploy( + self, scheduler: "scheduler.ResourceScheduler", agent: str, resource_details: "state.ResourceDetails" + ) -> "const.ResourceState": + # FIXME: WDB to Sander: is the version of the state the correct version? + # It may happen that the set of types no longer matches the version? + # FIXME: code loading interface is not nice like this, + # - we may want to track modules per agent, instead of types + # - we may also want to track the module version vs the model version + # as it avoid the problem of fast chanfing model versions + + async def report_deploy_failure(excn: Exception) -> None: + res_type = resource_details.id.entity_type + log_line = data.LogLine.log( + logging.ERROR, + "All resources of type `%(res_type)s` failed to load handler code or install handler code " + "dependencies: `%(error)s`\n%(traceback)s", + res_type=res_type, + error=str(excn), + traceback="".join(traceback.format_tb(excn.__traceback__)), + ) + await scheduler._client.resource_action_update( + tid=scheduler._environment, + resource_ids=[resource_details.rvid], + action_id=uuid.uuid4(), + action=const.ResourceAction.deploy, + started=datetime.datetime.now().astimezone(), + finished=datetime.datetime.now().astimezone(), + messages=[log_line], + status=const.ResourceState.unavailable, + ) + + # Find code + code, invalid_resources = await scheduler._code_manager.get_code( + environment=scheduler._environment, + version=scheduler._state.version, + resource_types=scheduler._state.get_types_for_agent(agent), + ) + + # Bail out if this failed + if resource_details.id.entity_type in invalid_resources: + await report_deploy_failure(invalid_resources[resource_details.id.entity_type]) + return const.ResourceState.unavailable + + # Get executor + my_executor: executor.Executor = await scheduler._executor_manager.get_executor( + agent_name=agent, agent_uri="NO_URI", code=code + ) + failed_resources = my_executor.failed_resources + + # Bail out if this failed + if resource_details.id.entity_type in failed_resources: + await report_deploy_failure(failed_resources[resource_details.id.entity_type]) + return const.ResourceState.unavailable + + # DEPLOY!!! + gid = uuid.uuid4() + # FIXME: reason argument is not used + return await my_executor.execute(gid, resource_details, "New Scheduler initiated action") + + +@dataclass(frozen=True, kw_only=True) +class DryRun(Task): + version: int + + def delete_with_resource(self) -> bool: + return False + + +class RefreshFact(WithHashMatchTask): + pass diff --git a/src/inmanta/deploy/work.py b/src/inmanta/deploy/work.py index 2873353aa7..175b0a4609 100644 --- a/src/inmanta/deploy/work.py +++ b/src/inmanta/deploy/work.py @@ -16,57 +16,23 @@ Contact: code@inmanta.com """ -import abc import asyncio import functools import itertools -import typing from collections.abc import Iterator, Mapping, Set from dataclasses import dataclass -from typing import Callable, Generic, Optional, TypeAlias, TypeVar +from typing import Callable, Generic, Optional, TypeVar from inmanta.data.model import ResourceIdStr +from inmanta.deploy import tasks from inmanta.resources import Id - -@dataclass(frozen=True, kw_only=True) -class _Task(abc.ABC): - """ - Resource action task. Represents the execution of a specific resource action for a given resource. - """ - - resource: ResourceIdStr - - -class Deploy(_Task): - pass - - -@dataclass(frozen=True, kw_only=True) -class DryRun(_Task): - version: int - - -class RefreshFact(_Task): - pass - - -class PoisonPill(_Task): - """ - Task to signal queue shutdown - - It is used to make sure all workers wake up to observe that they have been closed. - It functions mostly as a no-op - """ - - -Task: TypeAlias = Deploy | DryRun | RefreshFact | PoisonPill """ Type alias for the union of all task types. Allows exhaustive case matches. """ -T = TypeVar("T", bound=Task, covariant=True) +T = TypeVar("T", bound=tasks.Task, covariant=True) @dataclass(frozen=True, kw_only=True) @@ -92,7 +58,7 @@ class TaskQueueItem: allowed. """ - task: PrioritizedTask[Task] + task: PrioritizedTask[tasks.Task] insert_order: int # Mutable state @@ -109,7 +75,7 @@ def __lt__(self, other: object) -> bool: return (self.task.priority, self.insert_order) < (other.task.priority, other.insert_order) -class AgentQueues(Mapping[Task, PrioritizedTask[Task]]): +class AgentQueues(Mapping[tasks.Task, PrioritizedTask[tasks.Task]]): """ Per-agent priority queue for ready-to-execute tasks. Clients must not interact with the underlying priority queues directly, only through the queue manipulation methods offered by this class. @@ -130,7 +96,7 @@ def __init__(self, consumer_factory: Callable[[str], None]) -> None: # => take approach suggested in heapq docs: simply mark as deleted. # => Keep view on all active tasks for a given resource, which also doubles as lookup for client operations # Only tasks in this collection are considered queued, as far as this class' client interface is concerned. - self._tasks_by_resource: dict[ResourceIdStr, dict[Task, TaskQueueItem]] = {} + self._tasks_by_resource: dict[ResourceIdStr, dict[tasks.Task, TaskQueueItem]] = {} # monotonically rising value for item insert order # use simple counter rather than time.monotonic_ns() for performance reasons self._entry_count: int = 0 @@ -151,26 +117,26 @@ def _get_queue(self, agent_name: str) -> asyncio.PriorityQueue[TaskQueueItem]: self._consumer_factory(agent_name) return out - def get_tasks_for_resource(self, resource: ResourceIdStr) -> set[Task]: + def get_tasks_for_resource(self, resource: ResourceIdStr) -> set[tasks.Task]: """ Returns all queued tasks for a given resource id. """ return set(self._tasks_by_resource.get(resource, {}).keys()) - def remove(self, task: Task) -> int: + def remove(self, task: tasks.Task) -> int: """ Removes the given task from its associated agent queue. Raises KeyError if it is not in the queue. Returns the priority at which the deleted task was queued. """ - tasks: dict[Task, TaskQueueItem] = self._tasks_by_resource.get(task.resource, {}) - queue_item: TaskQueueItem = tasks[task] + the_tasks: dict[tasks.Task, TaskQueueItem] = self._tasks_by_resource.get(task.resource, {}) + queue_item: TaskQueueItem = the_tasks[task] queue_item.deleted = True - del tasks[task] - if not tasks: + del the_tasks[task] + if not the_tasks: del self._tasks_by_resource[task.resource] return queue_item.task.priority - def discard(self, task: Task) -> Optional[int]: + def discard(self, task: tasks.Task) -> Optional[int]: """ Removes the given task from its associated agent queue if it is present. Returns the priority at which the deleted task was queued, if it was at all. @@ -180,10 +146,10 @@ def discard(self, task: Task) -> Optional[int]: except KeyError: return None - def _queue_item_for_task(self, task: Task) -> Optional[TaskQueueItem]: + def _queue_item_for_task(self, task: tasks.Task) -> Optional[TaskQueueItem]: return self._tasks_by_resource.get(task.resource, {}).get(task, None) - def sorted(self, agent: str) -> list[PrioritizedTask[Task]]: + def sorted(self, agent: str) -> list[PrioritizedTask[tasks.Task]]: # FIXME[#8008]: remove this method: it's only a PoC to hightlight how to achieve a sorted view queue: asyncio.PriorityQueue[TaskQueueItem] = self._agent_queues[agent] backing_heapq: list[TaskQueueItem] = queue._queue # type: ignore [attr-defined] @@ -194,11 +160,11 @@ def sorted(self, agent: str) -> list[PrioritizedTask[Task]]: # asyncio.PriorityQueue-like interface # ######################################## - def queue_put_nowait(self, prioritized_task: PrioritizedTask[Task]) -> None: + def queue_put_nowait(self, prioritized_task: PrioritizedTask[tasks.Task]) -> None: """ Add a new task to the associated agent's queue. """ - task: Task = prioritized_task.task + task: tasks.Task = prioritized_task.task priority: int = prioritized_task.priority already_queued: Optional[TaskQueueItem] = self._queue_item_for_task(task) if already_queued is not None and already_queued.task.priority <= priority: @@ -219,14 +185,14 @@ def queue_put_nowait(self, prioritized_task: PrioritizedTask[Task]) -> None: def send_shutdown(self) -> None: """Wake up all wrokers after shutdown is signalled""" poison_pill = TaskQueueItem( - task=PrioritizedTask(task=PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")), priority=0), + task=PrioritizedTask(task=tasks.PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")), priority=0), insert_order=self._entry_count, ) self._entry_count += 1 for queue in self._agent_queues.values(): queue.put_nowait(poison_pill) - async def queue_get(self, agent: str) -> Task: + async def queue_get(self, agent: str) -> tasks.Task: """ Consume a task from an agent's queue. If the queue is empty, blocks until a task becomes available. """ @@ -254,10 +220,10 @@ def task_done(self, agent: str) -> None: # Mapping implementation# ######################### - def __getitem__(self, key: Task) -> PrioritizedTask[Task]: + def __getitem__(self, key: tasks.Task) -> PrioritizedTask[tasks.Task]: return self._tasks_by_resource.get(key.resource, {})[key].task - def __iter__(self) -> Iterator[Task]: + def __iter__(self) -> Iterator[tasks.Task]: return itertools.chain.from_iterable(self._tasks_by_resource.values()) def __len__(self) -> int: @@ -270,7 +236,7 @@ class BlockedDeploy: Deploy task that is blocked on one or more of its dependencies (subset of its requires relation). """ - task: PrioritizedTask[Deploy] + task: PrioritizedTask["tasks.Deploy"] blocked_on: set[ResourceIdStr] @@ -359,7 +325,7 @@ def is_scheduled(resource: ResourceIdStr) -> bool: # definitely not scheduled return False # finally, check more expensive agent queue - task: Deploy = Deploy(resource=resource) + task: tasks.Deploy = tasks.Deploy(resource=resource) if task in self.agent_queues: # populate cache queued.add(resource) @@ -383,7 +349,7 @@ def extend_requires(resource: ResourceIdStr, added_requires: set[ResourceIdStr]) # # discard rather than remove because task may already be running, in which case we leave it run its course # and simply add a new one - task: Deploy = Deploy(resource=resource) + task: tasks.Deploy = tasks.Deploy(resource=resource) priority: Optional[int] = self.agent_queues.discard(task) queued.remove(resource) self.waiting[resource] = BlockedDeploy( @@ -408,7 +374,7 @@ def extend_requires(resource: ResourceIdStr, added_requires: set[ResourceIdStr]) } self.waiting[resource] = BlockedDeploy( # FIXME[#8015]: priority - task=PrioritizedTask(task=Deploy(resource=resource), priority=0), + task=PrioritizedTask(task=tasks.Deploy(resource=resource), priority=0), blocked_on=blocked_on, ) not_scheduled.discard(resource) @@ -448,25 +414,12 @@ def delete_resource(self, resource: ResourceIdStr) -> None: if resource in self.waiting: del self.waiting[resource] # additionally delete from agent_queues if a task is already queued - task: Task + task: tasks.Task for task in self.agent_queues.get_tasks_for_resource(resource): - delete: bool - match task: - case Deploy(): - delete = True - case DryRun(): - delete = False - case RefreshFact(): - delete = True - case PoisonPill(): - # don't care, the end is near - delete = False - case _ as _never: - typing.assert_never(_never) - if delete: + if task.delete_with_resource(): self.agent_queues.discard(task) - def notify_provides(self, finished_deploy: Deploy) -> None: + def notify_provides(self, finished_deploy: "tasks.Deploy") -> None: # FIXME[#8010]: consider failure scenarios -> check how current agent does it, e.g. skip-for-undefined # FIXME[#8008]: docstring + mention under lock + mention only iff not stale resource: ResourceIdStr = finished_deploy.resource diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 471083c5a0..f303d248d0 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -1255,7 +1255,9 @@ async def __do_start_agent( """ Start an autostarted agent process for the given environment. Should only be called if none is running yet. """ - config: str = await self._make_agent_config(env, connection=connection) + use_resource_scheduler: bool = opt.server_use_resource_scheduler.get() + + config: str = await self._make_agent_config(env, connection=connection, scheduler=use_resource_scheduler) config_dir = os.path.join(self._server_storage["agents"], str(env.id)) if not os.path.exists(config_dir): @@ -1270,8 +1272,6 @@ async def __do_start_agent( agent_log = os.path.join(self._server_storage["logs"], "agent-%s.log" % env.id) - use_resource_scheduler: bool = opt.server_use_resource_scheduler.get() - proc: subprocess.Process = await self._fork_inmanta( [ "--log-file-level", @@ -1297,6 +1297,7 @@ async def _make_agent_config( env: data.Environment, *, connection: Optional[asyncpg.connection.Connection], + scheduler: bool = False, ) -> str: """ Generate the config file for the process that hosts the autostarted agents @@ -1374,7 +1375,18 @@ async def _make_agent_config( config += """ ssl=True """ - + if scheduler: + config += f""" +[database] +wait_time={opt.db_wait_time.get()} +host={opt.db_host.get()} +port={opt.db_port.get()} +name={opt.db_name.get()} +username={opt.db_username.get()} +password={opt.db_password.get()} + + """ + # TODO: connection pool return config async def _fork_inmanta( @@ -1500,6 +1512,10 @@ async def environment_action_created(self, env: model.Environment) -> None: return env_db = await data.Environment.get_by_id(env.id) - await self._ensure_scheduler(env_db) # We need to make sure that the AGENT_SCHEDULER is registered to be up and running await self._agent_manager.ensure_agent_registered(env_db, const.AGENT_SCHEDULER_ID) + if not no_auto_start_scheduler: + await self._ensure_scheduler(env_db) + + +no_auto_start_scheduler = False diff --git a/src/inmanta/server/services/databaseservice.py b/src/inmanta/server/services/databaseservice.py index c4728a6ea4..7b84194b3a 100644 --- a/src/inmanta/server/services/databaseservice.py +++ b/src/inmanta/server/services/databaseservice.py @@ -70,25 +70,7 @@ def get_dependencies(self) -> list[str]: async def connect_database(self) -> None: """Connect to the database""" - database_host = opt.db_host.get() - database_port = opt.db_port.get() - - database_username = opt.db_username.get() - database_password = opt.db_password.get() - connection_pool_min_size = opt.db_connection_pool_min_size.get() - connection_pool_max_size = opt.db_connection_pool_max_size.get() - connection_timeout = opt.db_connection_timeout.get() - self._pool = await data.connect( - database_host, - database_port, - opt.db_name.get(), - database_username, - database_password, - connection_pool_min_size=connection_pool_min_size, - connection_pool_max_size=connection_pool_max_size, - connection_timeout=connection_timeout, - ) - LOGGER.info("Connected to PostgreSQL database %s on %s:%d", opt.db_name.get(), database_host, database_port) + self._pool = await server_db_connect() # Check if JIT is enabled async with self._pool.acquire() as connection: @@ -149,3 +131,25 @@ async def _report_database_pool_exhaustion(self) -> None: async def _check_database_pool_exhaustion(self) -> None: assert self._db_pool_watcher is not None # Make mypy happy self._db_pool_watcher.check_for_pool_exhaustion() + + +async def server_db_connect() -> asyncpg.pool.Pool: + database_host = opt.db_host.get() + database_port = opt.db_port.get() + database_username = opt.db_username.get() + database_password = opt.db_password.get() + connection_pool_min_size = opt.db_connection_pool_min_size.get() + connection_pool_max_size = opt.db_connection_pool_max_size.get() + connection_timeout = opt.db_connection_timeout.get() + out = await data.connect( + database_host, + database_port, + opt.db_name.get(), + database_username, + database_password, + connection_pool_min_size=connection_pool_min_size, + connection_pool_max_size=connection_pool_max_size, + connection_timeout=connection_timeout, + ) + LOGGER.info("Connected to PostgreSQL database %s on %s:%d", opt.db_name.get(), database_host, database_port) + return out diff --git a/src/inmanta/server/services/orchestrationservice.py b/src/inmanta/server/services/orchestrationservice.py index f655656ca9..f71c149d77 100644 --- a/src/inmanta/server/services/orchestrationservice.py +++ b/src/inmanta/server/services/orchestrationservice.py @@ -1274,6 +1274,7 @@ async def deploy( is_using_new_scheduler = opt.server_use_resource_scheduler.get() if is_using_new_scheduler: await self.autostarted_agent_manager._ensure_scheduler(env) + allagents = [const.AGENT_SCHEDULER_ID] else: await self.autostarted_agent_manager._ensure_agents(env, allagents) diff --git a/tests/agent_server/deploy/e2e/conftest.py b/tests/agent_server/deploy/e2e/conftest.py new file mode 100644 index 0000000000..9b428fb22b --- /dev/null +++ b/tests/agent_server/deploy/e2e/conftest.py @@ -0,0 +1,73 @@ +""" + Copyright 2024 Inmanta + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contact: code@inmanta.com +""" + +import asyncio +import logging + +import pytest + +import inmanta.server.agentmanager as agentmanager +import utils +from agent_server.deploy.scheduler_test_util import ClientHelper, DummyCodeManager +from inmanta.agent.agent_new import Agent +from inmanta.agent.in_process_executor import InProcessExecutorManager +from inmanta.server import SLICE_AGENT_MANAGER +from inmanta.server.config import server_use_resource_scheduler + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="function") +async def auto_start_agent(server_config): + return False + + +@pytest.fixture(scope="function") +async def clienthelper(client, environment): + return ClientHelper(client, environment) + + +@pytest.fixture(scope="function") +async def server_config(server_config, auto_start_agent): + agentmanager.no_auto_start_scheduler = not auto_start_agent + server_use_resource_scheduler.set("True") + yield server_config + agentmanager.no_auto_start_scheduler = False + + +@pytest.fixture(scope="function") +async def agent(server, environment): + """Construct an agent that can execute using the resource container""" + agentmanager = server.get_slice(SLICE_AGENT_MANAGER) + + a = Agent(environment) + + executor = InProcessExecutorManager( + environment, a._client, asyncio.get_event_loop(), logger, a.thread_pool, a._storage["code"], a._storage["env"], False + ) + a.executor_manager = executor + a.scheduler._executor_manager = executor + a.scheduler._code_manager = DummyCodeManager(a._client) + + await a.start() + + await utils.retry_limited(lambda: len(agentmanager.sessions) == 1, 10) + + yield a + + await a.stop() diff --git a/tests/agent_server/deploy/e2e/test_autostarted.py b/tests/agent_server/deploy/e2e/test_autostarted.py new file mode 100644 index 0000000000..7705d9e8d6 --- /dev/null +++ b/tests/agent_server/deploy/e2e/test_autostarted.py @@ -0,0 +1,86 @@ +""" + Copyright 2024 Inmanta + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contact: code@inmanta.com +""" + +import asyncio +import logging +import uuid + +import pytest + +from inmanta import const, data +from utils import _wait_until_deployment_finishes + +logger = logging.getLogger("inmanta.test.server_agent") + + +@pytest.mark.parametrize("auto_start_agent", (True,)) # this overrides a fixture to allow the agent to fork! +async def test_auto_deploy_no_splay(server, client, clienthelper, resource_container, environment, no_agent_backoff): + """ + Verify that the new scheduler can actually fork + """ + resource_container.Provider.reset() + env = await data.Environment.get_by_id(uuid.UUID(environment)) + await env.set(data.AUTOSTART_AGENT_MAP, {"internal": "", "agent1": ""}) + await env.set(data.AUTOSTART_ON_START, True) + + version = await clienthelper.get_version() + + resources = [ + { + "key": "key1", + "value": "value1", + "id": "test::Resource[agent1,key=key1],v=%d" % version, + "send_event": False, + "purged": False, + "requires": ["test::Resource[agent1,key=key2],v=%d" % version], + }, + { + "key": "key2", + "value": "value2", + "id": "test::Resource[agent1,key=key2],v=%d" % version, + "send_event": False, + "purged": False, + "requires": [], + }, + ] + + # set auto deploy and push + result = await client.set_setting(environment, data.AUTO_DEPLOY, True) + assert result.code == 200 + + await clienthelper.put_version_simple(resources, version) + + # check deploy + await _wait_until_deployment_finishes(client, environment, version) + result = await client.get_version(environment, version) + assert result.code == 200 + assert result.result["model"]["released"] + assert result.result["model"]["total"] == 2 + assert result.result["model"]["result"] == "failed" + + # check if agent 1 is started by the server + # deploy will fail because handler code is not uploaded to the server + result = await client.list_agents(tid=environment) + assert result.code == 200 + + while len(result.result["agents"]) == 0 or result.result["agents"][0]["state"] == "down": + result = await client.list_agents(tid=environment) + await asyncio.sleep(0.1) + + assert len(result.result["agents"]) == 1 + assert result.result["agents"][0]["name"] == const.AGENT_SCHEDULER_ID diff --git a/tests/agent_server/deploy/e2e/test_resource_handler_new.py b/tests/agent_server/deploy/e2e/test_resource_handler_new.py new file mode 100644 index 0000000000..b51ba01d43 --- /dev/null +++ b/tests/agent_server/deploy/e2e/test_resource_handler_new.py @@ -0,0 +1,160 @@ +""" + Copyright 2021 Inmanta + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contact: code@inmanta.com +""" + +# COPY OF EXISTING +import base64 +import logging +import typing +from typing import TypeVar + +import pytest + +from inmanta import const +from inmanta.agent import Agent +from inmanta.agent.handler import ResourceHandler +from inmanta.protocol import SessionClient, VersionMatch, common +from utils import _wait_until_deployment_finishes, log_contains, make_random_file + +T = TypeVar("T") + + +class MockSessionClient(SessionClient): + def __init__(self, return_code, content): + self._version_match = VersionMatch.highest + self.return_code = return_code + self.content = content + + def get_file(self, hash_id): + content = b"" + if self.return_code != 404: + content = base64.b64encode(self.content) + return common.Result(self.return_code, result={"content": content}) + + +class MockGetFileResourceHandler(ResourceHandler): + def __init__(self, client): + self._client = client + + def run_sync(self, func: typing.Callable[[], T]) -> T: + return func() + + +def test_get_file_corrupted(): + (hash, content, body) = make_random_file() + client = MockSessionClient(200, b"corrupted_file") + resource_handler = MockGetFileResourceHandler(client) + + with pytest.raises(Exception): + resource_handler.get_file(hash) + + +def test_get_file_success(): + (hash, content, body) = make_random_file() + client = MockSessionClient(200, content) + resource_handler = MockGetFileResourceHandler(client) + + result = resource_handler.get_file(hash) + assert content == result + + +def test_get_file_not_found(): + client = MockSessionClient(404, None) + resource_handler = MockGetFileResourceHandler(client) + result = resource_handler.get_file("hash") + assert result is None + + +async def test_logging_error(resource_container, environment, client, agent, clienthelper, caplog): + """ + When a log call uses an argument that is not JSON serializable, the corresponding resource should be marked as failed, + and the exception logged. + """ + resource_container.Provider.reset() + version = await clienthelper.get_version() + + res_id_1 = "test::BadLogging[agent1,key=key1],v=%d" % version + resources = [ + { + "key": "key1", + "value": "value1", + "id": res_id_1, + "send_event": False, + "purged": False, + "requires": [], + }, + ] + + await clienthelper.put_version_simple(resources, version) + + result = await client.release_version(environment, version, True, const.AgentTriggerMethod.push_full_deploy) + assert result.code == 200 + + result = await client.get_version(environment, version) + assert result.code == 200 + + await _wait_until_deployment_finishes(client, environment, version) + result = await client.get_resource(tid=environment, id=res_id_1, logs=False, status=True) + assert result.code == 200 + assert result.result["status"] == "failed" + + log_contains(caplog, "conftest.agent1", logging.ERROR, "Failed to serialize argument for log message") + + +@pytest.mark.parametrize( + "resource_type", + ["test::FailFast", "test::FailFastCRUD", "test::BadPost", "test::BadPostCRUD"], +) +async def test_formatting_exception_messages( + resource_container, environment: str, client, agent: Agent, clienthelper, resource_type: str +) -> None: + """ + Ensure that exception raised in the Handler are correctly formatted in the resource action log. + Special characters should not be escaped (see: inmanta/inmanta-lsm#699). + """ + resource_container.Provider.reset() + version = await clienthelper.get_version() + res_id_1 = f"{resource_type}[agent1,key=key1],v={version}" + resources = [ + { + "key": "key1", + "value": "value1", + "id": res_id_1, + "send_event": False, + "purged": False, + "requires": [], + **({"purge_on_delete": False} if resource_type.endswith("CRUD") else {}), + }, + ] + + await clienthelper.put_version_simple(resources, version) + result = await client.release_version(environment, version, True, const.AgentTriggerMethod.push_full_deploy) + assert result.code == 200 + await _wait_until_deployment_finishes(client, environment, version) + + result = await client.get_resource_actions( + tid=environment, + resource_type=resource_type, + agent="agent1", + log_severity=const.LogLevel.ERROR.value, + limit=1, + ) + assert result.code == 200, result.result + assert len(result.result["data"]) == 1 + error_messages = [msg for msg in result.result["data"][0]["messages"] if msg["level"] == const.LogLevel.ERROR.value] + assert len(error_messages) == 1, error_messages + assert "(exception: Exception('An\nError\tMessage')" in error_messages[0]["msg"] diff --git a/tests/agent_server/deploy/test_scheduler_e2e.py b/tests/agent_server/deploy/e2e/test_scheduler_e2e_deploy.py similarity index 62% rename from tests/agent_server/deploy/test_scheduler_e2e.py rename to tests/agent_server/deploy/e2e/test_scheduler_e2e_deploy.py index ca7683ead4..9482611691 100644 --- a/tests/agent_server/deploy/test_scheduler_e2e.py +++ b/tests/agent_server/deploy/e2e/test_scheduler_e2e_deploy.py @@ -16,49 +16,16 @@ Contact: code@inmanta.com """ -import asyncio import logging -import pytest - -import utils -from agent_server.deploy.scheduler_test_util import DummyCodeManager -from inmanta import config -from inmanta.agent.agent_new import Agent -from inmanta.agent.in_process_executor import InProcessExecutorManager -from inmanta.config import Config -from inmanta.server import SLICE_AGENT_MANAGER -from inmanta.util import get_compiler_version, groupby -from utils import resource_action_consistency_check, retry_limited +from agent_server.deploy.scheduler_test_util import wait_full_success +from inmanta import const +from inmanta.deploy.state import DeploymentResult +from utils import resource_action_consistency_check logger = logging.getLogger(__name__) -@pytest.fixture(scope="function") -async def agent(server, environment): - """Construct an agent that can execute using the resource container""" - agentmanager = server.get_slice(SLICE_AGENT_MANAGER) - - config.Config.set("config", "agent-deploy-interval", "0") - config.Config.set("config", "agent-repair-interval", "0") - a = Agent(environment) - - executor = InProcessExecutorManager( - environment, a._client, asyncio.get_event_loop(), logger, a.thread_pool, a._storage["code"], a._storage["env"], False - ) - a.executor_manager = executor - a.scheduler._executor_manager = executor - a.scheduler._code_manager = DummyCodeManager(a._client) - - await a.start() - - await utils.retry_limited(lambda: len(agentmanager.sessions) == 1, 10) - - yield a - - await a.stop() - - async def test_basics(agent, resource_container, clienthelper, client, environment): """ This tests make sure the resource scheduler is working as expected for these parts: @@ -69,15 +36,12 @@ async def test_basics(agent, resource_container, clienthelper, client, environme env_id = environment scheduler = agent.scheduler - # First part - test the ResourceScheduler (retrieval of data from DB) - Config.set("config", "agent-deploy-interval", "100") - Config.set("server", "new-resource-scheduler", "True") - resource_container.Provider.reset() # set the deploy environment resource_container.Provider.set("agent1", "key", "value") resource_container.Provider.set("agent2", "key", "value") resource_container.Provider.set("agent3", "key", "value") + resource_container.Provider.set_fail("agent1", "key3", 2) async def make_version(is_different=False): """ @@ -137,78 +101,74 @@ async def make_version(is_different=False): ) return version, resources - async def wait_for_resources(version: int, n: int) -> None: - result = await client.get_version(env_id, version) - assert result.code == 200 - - def done_per_agent(result): - done = [x for x in result.result["resources"] if x["status"] == "deployed"] - peragent = groupby(done, lambda x: x["agent"]) - return {agent: len([x for x in grp]) for agent, grp in peragent} - - def mindone(result): - alllist = done_per_agent(result).values() - if len(alllist) == 0: - return 0 - return min(alllist) - - async def done(): - result = await client.get_version(env_id, version) - return mindone(result) < n - - await retry_limited(done, 10) - logger.info("setup done") version1, resources = await make_version() - result = await client.put_version( - tid=env_id, version=version1, resources=resources, unknowns=[], version_info={}, compiler_version=get_compiler_version() - ) - assert result.code == 200 + await clienthelper.put_version_simple(version=version1, resources=resources) logger.info("first version pushed") # deploy and wait until one is ready - result = await client.release_version(env_id, version1, push=False) + result = await client.release_version(env_id, version1) assert result.code == 200 + await clienthelper.wait_for_released(version1) + logger.info("first version released") # timeout on single thread! - await wait_for_resources(version1, n=1) - await check_scheduler_state(resources, scheduler) - await clienthelper.wait_for_deployed(version1) + await clienthelper.wait_for_deployed() await resource_action_consistency_check() + await check_server_state_vs_scheduler_state(client, environment, scheduler) - version1, resources = await make_version(True) - result = await client.put_version( - tid=env_id, version=version1, resources=resources, unknowns=[], version_info={}, compiler_version=get_compiler_version() - ) + # check states + result = await client.resource_list(environment, deploy_summary=True) assert result.code == 200 + summary = result.result["metadata"]["deploy_summary"] + # {'by_state': {'available': 3, 'cancelled': 0, 'deployed': 12, 'deploying': 0, 'failed': 0, 'skipped': 0, + # 'skipped_for_undefined': 0, 'unavailable': 0, 'undefined': 0}, 'total': 15} + print(summary) + assert 10 == summary["by_state"]["deployed"] + assert 1 == summary["by_state"]["failed"] + assert 4 == summary["by_state"]["skipped"] + + version1, resources = await make_version(True) + await clienthelper.put_version_simple(version=version1, resources=resources) # deploy and wait until one is ready result = await client.release_version(env_id, version1, push=False) - assert result.code == 200 - - # all deployed! - async def done(): - result = await client.resource_list(environment, deploy_summary=True) - assert result.code == 200 - summary = result.result["metadata"]["deploy_summary"] - # {'by_state': {'available': 3, 'cancelled': 0, 'deployed': 12, 'deploying': 0, 'failed': 0, 'skipped': 0, - # 'skipped_for_undefined': 0, 'unavailable': 0, 'undefined': 0}, 'total': 15} - total = summary["total"] - deployed = summary["by_state"]["deployed"] - return total == deployed + await clienthelper.wait_for_released(version1) - await retry_limited(done, 10) + await clienthelper.wait_for_deployed() await check_scheduler_state(resources, scheduler) await resource_action_consistency_check() + # deploy trigger + await client.deploy(environment, agent_trigger_method=const.AgentTriggerMethod.push_incremental_deploy) + + await wait_full_success(client, environment) + + +async def check_server_state_vs_scheduler_state(client, environment, scheduler): + result = await client.resource_list(environment, deploy_summary=True) + assert result.code == 200 + for item in result.result["data"]: + the_id = item["resource_id"] + status = item["status"] + state = scheduler._state.resource_state[the_id] + + state_correspondence = { + "deployed": DeploymentResult.DEPLOYED, + "skipped": DeploymentResult.FAILED, + "failed": DeploymentResult.FAILED, + } + + assert state_correspondence[status] == state.deployment_result + async def check_scheduler_state(resources, scheduler): # State consistency check diff --git a/tests/agent_server/deploy/scheduler_test_util.py b/tests/agent_server/deploy/scheduler_test_util.py index e803b09569..94e0eea30d 100644 --- a/tests/agent_server/deploy/scheduler_test_util.py +++ b/tests/agent_server/deploy/scheduler_test_util.py @@ -21,15 +21,18 @@ import uuid from typing import Collection, Mapping, Set +import utils from inmanta.agent import executor from inmanta.agent.code_manager import CodeManager from inmanta.agent.executor import ExecutorBlueprint, ResourceInstallSpec from inmanta.data import ResourceIdStr from inmanta.data.model import LEGACY_PIP_DEFAULT, ResourceType from inmanta.deploy.state import ResourceDetails +from inmanta.protocol import Client from inmanta.protocol.common import custom_json_encoder from inmanta.resources import Id from inmanta.types import JsonType +from utils import retry_limited def make_requires(resources: Mapping[ResourceIdStr, ResourceDetails]) -> Mapping[ResourceIdStr, Set[ResourceIdStr]]: @@ -88,3 +91,43 @@ async def get_code( self, environment: uuid.UUID, version: int, resource_types: Collection[ResourceType] ) -> tuple[Collection[ResourceInstallSpec], executor.FailedResources]: return ([ResourceInstallSpec(rt, version, dummyblueprint) for rt in resource_types], {}) + + +async def _wait_until_deployment_finishes(client: Client, environment: str, version: int = -1, timeout: int = 10) -> None: + """Interface kept for backward compat""" + + async def done(): + result = await client.resource_list(environment, deploy_summary=True) + assert result.code == 200 + summary = result.result["metadata"]["deploy_summary"] + # {'by_state': {'available': 3, 'cancelled': 0, 'deployed': 12, 'deploying': 0, 'failed': 0, 'skipped': 0, + # 'skipped_for_undefined': 0, 'unavailable': 0, 'undefined': 0}, 'total': 15} + print(summary) + available = summary["by_state"]["available"] + deploying = summary["by_state"]["deploying"] + return available + deploying == 0 + + await retry_limited(done, 10) + + +async def wait_full_success(client: Client, environment: str, version: int = -1, timeout: int = 10) -> None: + """Interface kept for backward compat""" + + async def done(): + result = await client.resource_list(environment, deploy_summary=True) + assert result.code == 200 + summary = result.result["metadata"]["deploy_summary"] + # {'by_state': {'available': 3, 'cancelled': 0, 'deployed': 12, 'deploying': 0, 'failed': 0, 'skipped': 0, + # 'skipped_for_undefined': 0, 'unavailable': 0, 'undefined': 0}, 'total': 15} + print(summary) + total = summary["total"] + success = summary["by_state"]["deployed"] + return total == success + + await retry_limited(done, 10) + + +class ClientHelper(utils.ClientHelper): + + async def wait_for_deployed(self, version: int = -1) -> None: + await _wait_until_deployment_finishes(self.client, self.environment) diff --git a/tests/agent_server/deploy/test_scheduler_agent.py b/tests/agent_server/deploy/test_scheduler_agent.py index a7c89f4f59..9015a3664f 100644 --- a/tests/agent_server/deploy/test_scheduler_agent.py +++ b/tests/agent_server/deploy/test_scheduler_agent.py @@ -29,6 +29,7 @@ import inmanta.types from agent_server.deploy.scheduler_test_util import DummyCodeManager, make_requires +from inmanta import const from inmanta.agent import executor from inmanta.agent.agent_new import Agent from inmanta.agent.executor import ResourceDetails, ResourceInstallSpec @@ -45,8 +46,9 @@ def __init__(self): self.execute_count = 0 self.failed_resources = {} - async def execute(self, gid: uuid.UUID, resource_details: ResourceDetails, reason: str) -> None: + async def execute(self, gid: uuid.UUID, resource_details: ResourceDetails, reason: str) -> const.ResourceState: self.execute_count += 1 + return const.ResourceState.deployed async def dry_run(self, resources: Sequence[ResourceDetails], dry_run_id: uuid.UUID) -> None: pass diff --git a/tests/agent_server/test_resource_view.py b/tests/agent_server/test_resource_view.py index 3c44f30c89..c1055f5994 100644 --- a/tests/agent_server/test_resource_view.py +++ b/tests/agent_server/test_resource_view.py @@ -24,8 +24,9 @@ async def test_consistent_resource_state_reporting( - server, agent_factory, environment, resource_container, clienthelper, client, no_agent_backoff + server, agent, environment, resource_container, clienthelper, client, no_agent_backoff ) -> None: + """Doesn't work for new scheduler, as every release is a deploy""" env = await data.Environment.get_by_id(uuid.UUID(environment)) await env.set(data.AUTOSTART_AGENT_MAP, {"internal": "", "agent1": ""}) await env.set(data.AUTO_DEPLOY, False) @@ -34,8 +35,6 @@ async def test_consistent_resource_state_reporting( await env.set(data.AUTOSTART_AGENT_REPAIR_INTERVAL, 0) await env.set(data.AUTOSTART_ON_START, True) - agent = await agent_factory(environment=environment, hostname="agent1") - rid = "test::Resource[agent1,key=key1]" rid2 = "test::Resource[agent1,key=key2]" diff --git a/tests/utils.py b/tests/utils.py index 47a1a535bb..1112a714b2 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -425,6 +425,9 @@ async def put_version_simple(self, resources: list[dict[str, Any]], version: int if wait_for_released: await retry_limited(functools.partial(self.is_released, version), timeout=1, interval=0.05) + async def wait_for_released(self, version: int): + await retry_limited(functools.partial(self.is_released, version), timeout=1, interval=0.05) + async def is_released(self, version: int) -> bool: versions = await self.client.list_versions(tid=self.environment) assert versions.code == 200