Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Extract the watching logic into a separate module #62

Merged
merged 2 commits into from
May 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 11 additions & 42 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@
from typing import Optional, Callable, Tuple, Union, MutableMapping, NewType

import aiojobs
import kubernetes.watch

from kopf.reactor.handling import custom_object_handler
from kopf.reactor.lifecycles import get_default_lifecycle
from kopf.reactor.peering import PEERING_CRD_RESOURCE, PEERING_DEFAULT_NAME
from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id
from kopf.reactor.registry import get_default_registry, BaseRegistry, Resource
from kopf.reactor.watching import streaming_aiter
from kopf.reactor.watching import infinite_watch

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,46 +68,16 @@ async def watcher(
scheduler = await aiojobs.create_scheduler(limit=10)
queues = {}
try:
while True:

# Make a Kubernetes call to watch for the events via the API.
w = kubernetes.watch.Watch()
api = kubernetes.client.CustomObjectsApi()
api_fn = api.list_cluster_custom_object
stream = w.stream(api_fn, resource.group, resource.version, resource.plural)
async for event in streaming_aiter(stream):

# "410 Gone" is for the "resource version too old" error, we must restart watching.
# The resource versions are lost by k8s after few minutes (as per the official doc).
# The error occurs when there is nothing happening for few minutes. This is normal.
if event['type'] == 'ERROR' and event['object']['code'] == 410:
logger.debug("Restarting the watch-stream for %r", resource)
break # out of for-cycle, to the while-true-cycle.

# Other watch errors should be fatal for the operator.
if event['type'] == 'ERROR':
raise Exception(f"Error in the watch-stream: {event['object']}")

# Ensure that the event is something we understand and can handle.
if event['type'] not in ['ADDED', 'MODIFIED', 'DELETED']:
logger.warning("Ignoring an unsupported event type: %r", event)
continue

# Filter out all unrelated events as soon as possible (before queues), and silently.
# TODO: Reimplement via api.list_namespaced_custom_object, and API-level filtering.
ns = event['object'].get('metadata', {}).get('namespace', None)
if namespace is not None and ns is not None and ns != namespace:
continue

# Either use the existing object's queue, or create a new one together with the per-object job.
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
key = (resource, event['object']['metadata']['uid'])
try:
await queues[key].put(event)
except KeyError:
queues[key] = asyncio.Queue()
await queues[key].put(event)
await scheduler.spawn(worker(handler=handler, queues=queues, key=key))
# Either use the existing object's queue, or create a new one together with the per-object job.
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
async for event in infinite_watch(resource=resource, namespace=namespace):
key = (resource, event['object']['metadata']['uid'])
try:
await queues[key].put(event)
except KeyError:
queues[key] = asyncio.Queue()
await queues[key].put(event)
await scheduler.spawn(worker(handler=handler, queues=queues, key=key))

finally:
# Forcedly terminate all the fire-and-forget per-object jobs, of they are still running.
Expand Down
71 changes: 71 additions & 0 deletions kopf/reactor/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@

import asyncio
import logging
from typing import Union

import kubernetes

from kopf.reactor.registry import Resource

logger = logging.getLogger(__name__)


class WatchingError(Exception):
"""
Raised when an unexpected error happens in the watch-stream API.
"""


class StopStreaming(RuntimeError):
"""
Raised when the watch-stream generator ends streaming.
Expand Down Expand Up @@ -51,3 +62,63 @@ async def streaming_aiter(src, loop=None, executor=None):
yield await loop.run_in_executor(executor, streaming_next, src)
except StopStreaming:
return


async def streaming_watch(
resource: Resource,
namespace: Union[None, str],
):
"""
Stream the watch-events from one single API watch-call.
"""
loop = asyncio.get_event_loop()
w = kubernetes.watch.Watch()
api = kubernetes.client.CustomObjectsApi()
api_fn = api.list_cluster_custom_object
stream = w.stream(api_fn, resource.group, resource.version, resource.plural)
async for event in streaming_aiter(stream, loop=loop):

# "410 Gone" is for the "resource version too old" error, we must restart watching.
# The resource versions are lost by k8s after few minutes (as per the official doc).
# The error occurs when there is nothing happening for few minutes. This is normal.
if event['type'] == 'ERROR' and event['object']['code'] == 410:
logger.debug("Restarting the watch-stream for %r", resource)
break # out of for-cycle, to the while-true-cycle.

# Other watch errors should be fatal for the operator.
if event['type'] == 'ERROR':
raise WatchingError(f"Error in the watch-stream: {event['object']}")

# Ensure that the event is something we understand and can handle.
if event['type'] not in ['ADDED', 'MODIFIED', 'DELETED']:
logger.warning("Ignoring an unsupported event type: %r", event)
continue

# Filter out all unrelated events as soon as possible (before queues), and silently.
# TODO: Reimplement via api.list_namespaced_custom_object, and API-level filtering.
ns = event['object'].get('metadata', {}).get('namespace', None)
if namespace is not None and ns is not None and ns != namespace:
continue

# Yield normal events to the consumer.
yield event


async def infinite_watch(
resource: Resource,
namespace: Union[None, str],
):
"""
Stream the watch-events infinitely.

This routine is extracted only due to difficulty of testing
of the infinite loops. It is made as simple as possible,
and is assumed to work without testing.

This routine never ends gracefully. If a watcher's stream fails,
a new one is recreated, and the stream continues.
It only exits with unrecoverable exceptions.
"""
while True:
async for event in streaming_watch(resource=resource, namespace=namespace):
yield event