Skip to content

Commit

Permalink
Merge pull request #571 from opsani/OPTSERV-1269-part-2-servox-async-…
Browse files Browse the repository at this point in the history
…logic-hardening

Optserv 1269 part 2 servox async logic hardening
  • Loading branch information
linkous8 authored May 7, 2024
2 parents f3ade34 + c12d58a commit b6caf80
Show file tree
Hide file tree
Showing 23 changed files with 931 additions and 596 deletions.
51 changes: 40 additions & 11 deletions servo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ class ServoStatuses(enum.StrEnum):
aborted = "aborted"
cancelled = "cancelled"

def from_error(error: Exception) -> ServoStatuses:
if isinstance(error, servo.errors.AdjustmentRejectedError):
return ServoStatuses.rejected
elif isinstance(error, servo.errors.EventAbortedError):
return ServoStatuses.aborted
elif isinstance(error, servo.errors.EventCancelledError):
return ServoStatuses.cancelled
else:
return ServoStatuses.failed


Statuses = Union[OptimizerStatuses, ServoStatuses]

Expand Down Expand Up @@ -105,6 +115,9 @@ class Config:
class Status(pydantic.BaseModel):
status: Statuses
message: Optional[str] = None
other_messages: Optional[
list[str]
] = None # other lower priority error in exception group
reason: Optional[str] = None
state: Optional[Dict[str, Any]] = None
descriptor: Optional[Dict[str, Any]] = None
Expand All @@ -120,18 +133,34 @@ def ok(
return cls(status=ServoStatuses.ok, message=message, reason=reason, **kwargs)

@classmethod
def from_error(cls, error: servo.errors.BaseError, **kwargs) -> "Status":
"""Return a status object representation from the given error."""
if isinstance(error, servo.errors.AdjustmentRejectedError):
status = ServoStatuses.rejected
elif isinstance(error, servo.errors.EventAbortedError):
status = ServoStatuses.aborted
elif isinstance(error, servo.errors.EventCancelledError):
status = ServoStatuses.cancelled
else:
def from_error(
cls, error: servo.errors.BaseError | ExceptionGroup, **kwargs
) -> "Status":
"""Return a status object representation from the given error (first if multiple in group)."""
if isinstance(error, ExceptionGroup):
servo.logger.warning(
f"from_error executed on exceptiongroup {error}. May produce undefined behavior"
)
status = ServoStatuses.failed

return cls(status=status, message=str(error), reason=error.reason, **kwargs)
try:
error = servo.errors.ServoError.servo_error_from_group(
exception_group=error
)
if error._additional_errors:
additional_messages = [str(e) for e in error._additional_errors]
kwargs["additional_messages"] = (
kwargs.get("additional_messages", []) + additional_messages
)
except Exception:
servo.logger.exception(
"Failed to derive exceptiongroup reason for api response"
)
pass

reason = getattr(error, "reason", ...)
status = ServoStatuses.from_error(error)

return cls(status=status, message=str(error), reason=reason, **kwargs)

def dict(
self,
Expand Down
35 changes: 11 additions & 24 deletions servo/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,14 @@ async def assemble(

telemetry = servo.telemetry.Telemetry()

# Initialize all active connectors
connectors: List[servo.connector.BaseConnector] = []
for name, connector_type in routes.items():
connector_config = getattr(servo_config, name)
if connector_config is not None:
connector = connector_type(
name=name,
config=connector_config,
pubsub_exchange=pubsub_exchange,
telemetry=telemetry,
__connectors__=connectors,
)
connectors.append(connector)

# Build the servo object
servo_ = servo.servo.Servo(
config=servo_config,
connectors=connectors.copy(), # Avoid self-referential reference to servo
telemetry=telemetry,
__connectors__=connectors,
pubsub_exchange=pubsub_exchange,
routes=routes,
)
connectors.append(servo_)
servo_.load_connectors()
servos.append(servo_)

assembly = cls(
Expand Down Expand Up @@ -224,12 +209,14 @@ async def add_servo(self, servo_: servo.servo.Servo) -> None:
"""
self.servos.append(servo_)

await servo.attach()
await servo_.attach()

if self.is_running:
await servo.startup()
await servo_.startup()

async def remove_servo(self, servo_: servo.servo.Servo) -> None:
async def remove_servo(
self, servo_: servo.servo.Servo, reason: str | None = None
) -> None:
"""Remove a servo from the assembly.
Before removal, the servo is sent the detach event to prepare for
Expand All @@ -239,10 +226,10 @@ async def remove_servo(self, servo_: servo.servo.Servo) -> None:
servo_: The servo to remove from the assembly.
"""

await servo.detach()
await servo_.detach()

if self.is_running:
await servo.shutdown()
await servo_.shutdown(reason)

self.servos.remove(servo_)

Expand All @@ -257,12 +244,12 @@ async def startup(self):
)
)

async def shutdown(self):
async def shutdown(self, reason: str | None = None):
"""Notify all servos that the assembly is shutting down."""
await asyncio.gather(
*list(
map(
lambda s: s.shutdown(),
lambda s: s.shutdown(reason=reason),
filter(
lambda s: s.is_running,
self.servos,
Expand Down
87 changes: 45 additions & 42 deletions servo/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,53 +757,56 @@ async def process_checks(

checks: list[Check] = functools.reduce(lambda a, b: a + b.value, results, [])

for check in checks:
if check.success:
# FIXME: This should hold Check objects but hashing isn't matching
if check.id not in passing:
# calling loguru with kwargs (component) triggers a str.format call which trips up on names with single curly braces
servo.logger.success(
f"✅ Check '{check.escaped_name}' passed",
component=check.id,
)
passing.add(check.id)
else:
failure = check
servo.logger.warning(
f"❌ Check '{failure.name}' failed ({len(passing)} passed): {failure.message}"
)
if failure.hint:
servo.logger.info(f"Hint: {failure.hint}")

if failure.exception:
servo.logger.opt(exception=failure.exception).debug(
"check.exception"
async with asyncio.TaskGroup() as tg:
for check in checks:
if check.success:
# FIXME: This should hold Check objects but hashing isn't matching
if check.id not in passing:
# calling loguru with kwargs (component) triggers a str.format call which trips up on names with single curly braces
servo.logger.success(
f"✅ Check '{check.escaped_name}' passed",
component=check.id,
)
passing.add(check.id)
else:
failure = check
servo.logger.warning(
f"❌ Check '{failure.name}' failed ({len(passing)} passed): {failure.message}"
)
if failure.hint:
servo.logger.info(f"Hint: {failure.hint}")

if failure.remedy:
if asyncio.iscoroutinefunction(failure.remedy):
task = asyncio.create_task(failure.remedy())
elif asyncio.iscoroutine(failure.remedy):
task = asyncio.create_task(failure.remedy)
else:
if failure.exception:
servo.logger.opt(exception=failure.exception).debug(
"check.exception"
)

async def fn() -> None:
result = failure.remedy()
if asyncio.iscoroutine(result):
await result

task = asyncio.create_task(fn())

if checks_config.remedy:
servo.logger.info("💡 Attempting to apply remedy...")
if failure.remedy and checks_config.remedy:
try:
await asyncio.wait_for(task, 10.0)
except asyncio.TimeoutError as error:
coro = None
async with asyncio.timeout(10.0):
if asyncio.iscoroutine(failure.remedy):
awaitable = failure.remedy
else:
if asyncio.iscoroutinefunction(failure.remedy):
coro = failure.remedy
else:

async def coro() -> None:
result = failure.remedy()
if asyncio.iscoroutine(result):
await result

awaitable = coro()

await tg.create_task(awaitable)
servo.logger.info("💡 Attempting to apply remedy...")

except asyncio.TimeoutError:
servo.logger.warning("💡 Remedy attempt timed out after 10s")
else:
task.cancel()
if checks_config.check_halting:
break

if checks_config.check_halting:
break

if not failure:
servo.logger.info("🔥 All checks passed.")
Expand Down
6 changes: 3 additions & 3 deletions servo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,9 +1150,9 @@ async def gather_checks():

if not dry_run:
poll = not no_poll
servo.runner.AssemblyRunner(context.assembly).run(
poll=poll,
interactive=bool(interactive),
servo.runner.AssemblyRunner(
context.assembly, poll=poll, interactive=bool(interactive)
).run(
debug=debug,
)

Expand Down
8 changes: 4 additions & 4 deletions servo/connectors/kube_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import pathlib
import pydantic
import time
from typing import Any, Dict, List, Optional, FrozenSet, Union
from typing import Annotated, Any, Dict, List, Optional, FrozenSet, Union

import servo
from servo.checks import CheckError
Expand Down Expand Up @@ -99,9 +99,9 @@ class SupportedKubeMetrics(str, Enum):


class KubeMetricsConfiguration(servo.BaseConfiguration):
namespace: DNSSubdomainName = pydantic.Field(
description="Namespace of the target resource"
)
namespace: Annotated[
str, DNSSubdomainName(description="Namespace of the target resource")
]
name: str = pydantic.Field(description="Name of the target resource")
kind: str = pydantic.Field(
default="Deployment",
Expand Down
Loading

0 comments on commit b6caf80

Please sign in to comment.