Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add health monitor #111

Merged
merged 7 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions sanic_ext/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from sanic_ext.config import Config, add_fallback_config
from sanic_ext.extensions.base import Extension
from sanic_ext.extensions.health.extension import HealthExtension
from sanic_ext.extensions.http.extension import HTTPExtension
from sanic_ext.extensions.injection.extension import InjectionExtension
from sanic_ext.extensions.injection.registry import InjectionRegistry
Expand Down Expand Up @@ -65,14 +66,15 @@ def __init__(
if MIN_SUPPORT > sanic_version:
min_version = ".".join(map(str, MIN_SUPPORT))
raise SanicException(
f"SanicExt only works with Sanic v{min_version} and above. "
f"It looks like you are running {__version__}."
f"Sanic Extensions only works with Sanic v{min_version} "
f"and above. It looks like you are running {__version__}."
)

self.app = app
self._injection_registry: Optional[InjectionRegistry] = None
self._openapi: Optional[SpecificationBuilder] = None
self.app = app
self.extensions: List[Extension] = []
self._injection_registry: Optional[InjectionRegistry] = None
self.sanic_version = sanic_version
app._ext = self
app.ctx._dependencies = SimpleNamespace()

Expand All @@ -88,6 +90,7 @@ def __init__(
InjectionExtension,
OpenAPIExtension,
HTTPExtension,
HealthExtension,
]
)

Expand Down
16 changes: 16 additions & 0 deletions sanic_ext/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def __init__(
cors_send_wildcard: bool = False,
cors_supports_credentials: bool = False,
cors_vary_header: bool = True,
health: bool = False,
health_endpoint: bool = False,
health_max_misses: int = 3,
health_missed_threshhold: int = 10,
health_monitor: bool = True,
health_report_interval: int = 5,
health_uri_to_info: str = "",
health_url_prefix: str = "/__health__",
http_all_methods: bool = True,
http_auto_head: bool = True,
http_auto_options: bool = True,
Expand Down Expand Up @@ -59,6 +67,14 @@ def __init__(
self.CORS_SEND_WILDCARD = cors_send_wildcard
self.CORS_SUPPORTS_CREDENTIALS = cors_supports_credentials
self.CORS_VARY_HEADER = cors_vary_header
self.HEALTH = health
self.HEALTH_ENDPOINT = health_endpoint
self.HEALTH_MAX_MISSES = health_max_misses
self.HEALTH_MISSED_THRESHHOLD = health_missed_threshhold
self.HEALTH_MONITOR = health_monitor
self.HEALTH_REPORT_INTERVAL = health_report_interval
self.HEALTH_URI_TO_INFO = health_uri_to_info
self.HEALTH_URL_PREFIX = health_url_prefix
self.HTTP_ALL_METHODS = http_all_methods
self.HTTP_AUTO_HEAD = http_auto_head
self.HTTP_AUTO_OPTIONS = http_auto_options
Expand Down
Empty file.
13 changes: 13 additions & 0 deletions sanic_ext/extensions/health/endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from sanic import Blueprint, Request, Sanic
from sanic.response import json
from sanic.worker.inspector import Inspector


def setup_health_endpoint(app: Sanic) -> None:
bp = Blueprint("SanicHealth", url_prefix=app.config.HEALTH_URL_PREFIX)

@bp.get(app.config.HEALTH_URI_TO_INFO)
async def info(request: Request):
return json(Inspector.make_safe(dict(request.app.m.workers)))

app.blueprint(bp)
27 changes: 27 additions & 0 deletions sanic_ext/extensions/health/extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from sanic.exceptions import SanicException

from sanic_ext.extensions.health.endpoint import setup_health_endpoint

from ..base import Extension
from .monitor import HealthMonitor


class HealthExtension(Extension):
name = "health"
MIN_VERSION = (22, 9)

def startup(self, bootstrap) -> None:
if self.config.HEALTH:
if self.config.HEALTH_MONITOR:
if self.MIN_VERSION > bootstrap.sanic_version:
min_version = ".".join(map(str, self.MIN_VERSION))
sanic_version = ".".join(map(str, bootstrap.sanic_version))
raise SanicException(
f"Health monitoring only works with Sanic "
f"v{min_version} and above. It looks like you are "
f"running {sanic_version}."
)
HealthMonitor.setup(self.app)

if self.config.HEALTH_ENDPOINT:
setup_health_endpoint(self.app)
162 changes: 162 additions & 0 deletions sanic_ext/extensions/health/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from __future__ import annotations

from asyncio import sleep
from dataclasses import dataclass
from datetime import datetime, timedelta
from multiprocessing import Manager
from queue import Empty, Full
from signal import SIGINT, SIGTERM
from signal import signal as signal_func
from typing import TYPE_CHECKING, Optional

from sanic.application.constants import ServerStage
from sanic.log import logger

if TYPE_CHECKING:
from sanic import Sanic


class Stale(ValueError):
...


@dataclass
class HealthState:
name: str
last: Optional[datetime] = None
misses: int = 0

def report(self, timestamp: int) -> None:
logger.debug(f"Reporting {self.name}")
if self.misses:
logger.info(f"Recovered {self.name}")
self.last = datetime.fromtimestamp(timestamp)
self.misses = 0

def missed(self) -> None:
self.misses += 1
logger.info(
f"Missed health check for {self.name} "
f"({self.misses}/{HealthMonitor.MAX_MISSES})"
)
if self.misses >= HealthMonitor.MAX_MISSES:
raise Stale

def check(self) -> None:
if not self.last:
return

threshhold = timedelta(
seconds=(HealthMonitor.MISSED_THRESHHOLD * (self.misses + 1))
)
if self.last < (datetime.now() - threshhold):
self.missed()

def reset(self) -> None:
self.misses = 0
self.last = datetime.now()


def send_healthy(name, queue):
health = (name, datetime.now().timestamp())
logger.debug(f"Sending health: {health}", extra={"verbosity": 1})
try:
queue.put_nowait(health)
except Full:
...


async def health_check(app: Sanic):
sent = datetime.now()

while app.state.stage is ServerStage.SERVING:
now = datetime.now()
if sent < now - timedelta(seconds=HealthMonitor.REPORT_INTERVAL):
send_healthy(app.m.name, app.shared_ctx.health_queue)
sent = now
await sleep(0.1)


async def start_health_check(app: Sanic):
app.add_task(health_check(app), name="health_check")


async def prepare_health_monitor(app, *_):
HealthMonitor.prepare(app)


async def setup_health_monitor(app, *_):
health = HealthMonitor(app)
process_names = [
process.name for process in app.manager.transient_processes
]
app.manager.manage(
"HealthMonitor",
health,
{
"process_names": process_names,
"health_queue": app.shared_ctx.health_queue,
},
)


class HealthMonitor:
MAX_MISSES = 3
REPORT_INTERVAL = 5
MISSED_THRESHHOLD = 10

def __init__(self, app: Sanic):
self.run = True
self.monitor_publisher = app.manager.monitor_publisher

def __call__(self, process_names, health_queue) -> None:
signal_func(SIGINT, self.stop)
signal_func(SIGTERM, self.stop)

now = datetime.now()
health_state = {
process_name: HealthState(last=now, name=process_name)
for process_name in process_names
}
while self.run:
try:
name, timestamp = health_queue.get_nowait()
except Empty:
...
else:
health_state[name].report(timestamp)

for state in health_state.values():
try:
state.check()
except Stale:
state.reset()
self.monitor_publisher.send(state.name)

def stop(self, *_):
self.run = False

@classmethod
def prepare(cls, app: Sanic):
sync_manager = Manager()
health_queue = sync_manager.Queue(maxsize=app.state.workers * 2)
app.shared_ctx.health_queue = health_queue

@classmethod
def setup(
cls,
app: Sanic,
max_misses: Optional[int] = None,
report_interval: Optional[int] = None,
missed_threshhold: Optional[int] = None,
):
HealthMonitor.MAX_MISSES = max_misses or app.config.HEALTH_MAX_MISSES
HealthMonitor.REPORT_INTERVAL = (
report_interval or app.config.HEALTH_REPORT_INTERVAL
)
HealthMonitor.MISSED_THRESHHOLD = (
missed_threshhold or app.config.HEALTH_MISSED_THRESHHOLD
)
app.main_process_start(prepare_health_monitor)
app.main_process_ready(setup_health_monitor)
app.after_server_start(start_health_check)