Skip to content

Commit

Permalink
Merge pull request openwallet-foundation#614 from sklump/openshift-po…
Browse files Browse the repository at this point in the history
…d-support

Openshift pod support
Signed-off-by: Nicholas Rempel <[email protected]>
  • Loading branch information
ianco authored Jul 20, 2020
2 parents 0ab2391 + 529b541 commit 2236e4c
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 184 deletions.
88 changes: 85 additions & 3 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,23 @@ class AdminStatusSchema(Schema):
"""Schema for the status endpoint."""


class AdminStatusLivelinessSchema(Schema):
"""Schema for the liveliness endpoint."""

alive = fields.Boolean(description="Liveliness status", example=True)


class AdminStatusReadinessSchema(Schema):
"""Schema for the liveliness endpoint."""

ready = fields.Boolean(description="Readiness status", example=True)


class AdminResponder(BaseResponder):
"""Handle outgoing messages from message handlers."""

def __init__(
self, context: InjectionContext, send: Coroutine, webhook: Coroutine, **kwargs
self, context: InjectionContext, send: Coroutine, webhook: Coroutine, **kwargs,
):
"""
Initialize an instance of `AdminResponder`.
Expand Down Expand Up @@ -111,6 +123,19 @@ def topic_filter(self, val: Sequence[str]):
self._topic_filter = filter


@web.middleware
async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
"""Only continue if application is ready to take work."""

if str(request.rel_url).rstrip("/") in (
"/status/live",
"/status/ready",
) or request.app._state.get("ready"):
return await handler(request)

raise web.HTTPServiceUnavailable(reason="Shutdown in progress")


class AdminServer(BaseAdminServer):
"""Admin HTTP server class."""

Expand All @@ -121,6 +146,7 @@ def __init__(
context: InjectionContext,
outbound_message_router: Coroutine,
webhook_router: Callable,
conductor_stop: Coroutine,
task_queue: TaskQueue = None,
conductor_stats: Coroutine = None,
):
Expand All @@ -133,6 +159,7 @@ def __init__(
context: The application context instance
outbound_message_router: Coroutine for delivering outbound messages
webhook_router: Callable for delivering webhooks
conductor_stop: Conductor (graceful) stop for shutdown API call
task_queue: An optional task queue for handlers
"""
self.app = None
Expand All @@ -142,6 +169,7 @@ def __init__(
)
self.host = host
self.port = port
self.conductor_stop = conductor_stop
self.conductor_stats = conductor_stats
self.loaded_modules = []
self.task_queue = task_queue
Expand All @@ -152,14 +180,14 @@ def __init__(

self.context = context.start_scope("admin")
self.responder = AdminResponder(
self.context, outbound_message_router, self.send_webhook
self.context, outbound_message_router, self.send_webhook,
)
self.context.injector.bind_instance(BaseResponder, self.responder)

async def make_application(self) -> web.Application:
"""Get the aiohttp application instance."""

middlewares = [validation_middleware]
middlewares = [ready_middleware, validation_middleware]

# admin-token and admin-token are mutually exclusive and required.
# This should be enforced during parameter parsing but to be sure,
Expand Down Expand Up @@ -220,6 +248,9 @@ async def collect_stats(request, handler):
web.get("/plugins", self.plugins_handler, allow_head=False),
web.get("/status", self.status_handler, allow_head=False),
web.post("/status/reset", self.status_reset_handler),
web.get("/status/live", self.liveliness_handler, allow_head=False),
web.get("/status/ready", self.readiness_handler, allow_head=False),
web.get("/shutdown", self.shutdown_handler, allow_head=False),
web.get("/ws", self.websocket_handler, allow_head=False),
]
)
Expand Down Expand Up @@ -284,6 +315,7 @@ async def start(self) -> None:

try:
await self.site.start()
self.app._state["ready"] = True
except OSError:
raise AdminSetupError(
"Unable to start webserver with host "
Expand All @@ -292,6 +324,7 @@ async def start(self) -> None:

async def stop(self) -> None:
"""Stop the webserver."""
self.app._state["ready"] = False # in case call does not come through OpenAPI
for queue in self.websocket_queues.values():
queue.stop()
if self.site:
Expand Down Expand Up @@ -370,6 +403,54 @@ async def redirect_handler(self, request: web.BaseRequest):
"""Perform redirect to documentation."""
raise web.HTTPFound("/api/doc")

@docs(tags=["server"], summary="Liveliness check")
@response_schema(AdminStatusLivelinessSchema(), 200)
async def liveliness_handler(self, request: web.BaseRequest):
"""
Request handler for liveliness check.
Args:
request: aiohttp request object
Returns:
The web response, always indicating True
"""
return web.json_response({"alive": True})

@docs(tags=["server"], summary="Readiness check")
@response_schema(AdminStatusReadinessSchema(), 200)
async def readiness_handler(self, request: web.BaseRequest):
"""
Request handler for liveliness check.
Args:
request: aiohttp request object
Returns:
The web response, indicating readiness for further calls
"""
return web.json_response({"ready": self.app._state["ready"]})

@docs(tags=["server"], summary="Shut down server")
async def shutdown_handler(self, request: web.BaseRequest):
"""
Request handler for server shutdown.
Args:
request: aiohttp request object
Returns:
The web response (empty production)
"""
self.app._state["ready"] = False
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.conductor_stop(), loop=loop)

return web.json_response({})

async def websocket_handler(self, request):
"""Send notifications to admin client over websocket."""

Expand Down Expand Up @@ -451,6 +532,7 @@ async def websocket_handler(self, request):
if msg:
await ws.send_json(msg)
send = loop.create_task(queue.dequeue(timeout=5.0))

except asyncio.CancelledError:
closed = True

Expand Down
Loading

0 comments on commit 2236e4c

Please sign in to comment.