Skip to content

Commit

Permalink
Merge pull request #132 from fiaas/list_watch
Browse files Browse the repository at this point in the history
Watcher: list, then watch to try to avoid yielding events for deleted resources
  • Loading branch information
tg90nor authored May 22, 2024
2 parents 4638004 + 107c9cd commit 5316841
Show file tree
Hide file tree
Showing 5 changed files with 489 additions and 110 deletions.
72 changes: 63 additions & 9 deletions k8s/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import json
import logging
from collections import namedtuple
from typing import Optional
from typing import Optional, Dict, List

import requests
import requests.packages.urllib3 as urllib3
Expand Down Expand Up @@ -111,7 +111,7 @@ def find(cls, name="", namespace="default", labels=None):
return [cls.from_dict(item) for item in resp.json()["items"]]

@classmethod
def list(cls, namespace="default"):
def _list_raw(cls, namespace="default"):
"""List all resources in given namespace"""
if namespace is None:
if not cls._meta.list_url:
Expand All @@ -120,8 +120,20 @@ def list(cls, namespace="default"):
else:
url = cls._build_url(name="", namespace=namespace)
resp = cls._client.get(url)
return resp

@classmethod
def list(cls, namespace="default"):
"""List all resources in given namespace"""
resp = cls._list_raw(namespace=namespace)
return [cls.from_dict(item) for item in resp.json()["items"]]

@classmethod
def list_with_meta(cls, namespace="default"):
"""List all resources in given namespace. Return ModelList"""
resp = cls._list_raw(namespace=namespace)
return ModelList.from_dict(cls, resp.json())

@classmethod
def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False):
"""Return a generator that yields WatchEvents of cls.
Expand All @@ -139,7 +151,7 @@ def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False
# As per https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch
# only resourceVersion is used for watch queries.
params["resourceVersion"] = resource_version
LOG.info("Restarting %s watch at resource version %s", cls.__name__, resource_version)
LOG.info("(Re)starting %s watch at resource version %s", cls.__name__, resource_version)
if allow_bookmarks:
params["allowWatchBookmarks"] = "true"

Expand Down Expand Up @@ -383,8 +395,11 @@ class WatchBaseEvent(ABC):

__slots__ = ("resource_version",)

def __init__(self, event_json):
self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion")
def __init__(self, event_json=None, resource_version=None):
if event_json is not None:
self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion")
else:
self.resource_version = resource_version

def __eq__(self, other):
return self.resource_version == other.resource_version
Expand All @@ -398,10 +413,21 @@ class WatchEvent(WatchBaseEvent):
MODIFIED = "MODIFIED"
DELETED = "DELETED"

def __init__(self, event_json, cls):
super(WatchEvent, self).__init__(event_json)
self.type = event_json["type"]
self.object = cls.from_dict(event_json["object"])
def __init__(self, event_json: dict = None, cls: type[Model] = None, _type: str = None, _object: Model = None):
if event_json is not None and cls is not None:
super(WatchEvent, self).__init__(event_json=event_json)
self.type = event_json["type"]
self.object = cls.from_dict(event_json["object"])
elif _type is not None and _object is not None:
# resource_version is effectively optional here to match the behavior for event_json in WatchBaseEvent
# in practice, watch events with None resourceVersion will break the caching in Watcher.watch()
resource_version = getattr(getattr(_object, "metadata", None), "resourceVersion", None)
super(WatchEvent, self).__init__(resource_version=resource_version)
self.type = _type
self.object = _object
else:
raise ValueError("requires either event_json and cls or _type and _object, " +
f"got {event_json=}, {cls=}, {_type=}, {_object=}")

def __repr__(self):
return "{cls}(type={type}, object={object})".format(
Expand All @@ -415,6 +441,11 @@ def has_object(self):
return True


class SyntheticAddedWatchEvent(WatchEvent):
def __init__(self, obj: Model):
super(SyntheticAddedWatchEvent, self).__init__(_type=WatchEvent.ADDED, _object=obj)


class WatchBookmark(WatchBaseEvent):
"""Bookmark events, if enabled, are sent periodically by the API server.
They only contain the resourceVersion of the event."""
Expand Down Expand Up @@ -505,3 +536,26 @@ def __str__(self):
@classmethod
def match(cls, event_json):
return event_json["type"] == "ERROR" and event_json["object"].get("kind") == "Status"


class ListMeta(Model):
_continue = Field(str)
remainingItemCount = Field(int)
resourceVersion = Field(str)


class ModelList:
"""
Generic type to hold list of Model instances (items) together with ListMeta (metadata),
as returned by list API calls
"""

def __init__(self, metadata: ListMeta, items: List[Model]):
self.metadata = metadata
self.items = items

@classmethod
def from_dict(cls, model_cls: type[Model], list_response_data: Dict):
metadata = ListMeta.from_dict(list_response_data.get('metadata', {}))
items = [model_cls.from_dict(item) for item in list_response_data.get('items', [])]
return cls(metadata, items)
16 changes: 15 additions & 1 deletion k8s/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@


import cachetools
import logging

from .base import APIServerError, WatchEvent
from .base import APIServerError, WatchEvent, SyntheticAddedWatchEvent

DEFAULT_CAPACITY = 1000

LOG = logging.getLogger(__name__)


class Watcher(object):
"""Higher-level interface to watch for changes in objects
Expand Down Expand Up @@ -54,6 +57,17 @@ def watch(self, namespace=None):
# Only used on reconnects, the first call to watch does a quorum read.
last_seen_resource_version = None
while self._run_forever:
if last_seen_resource_version is None:
# list all resources and yield a synthetic ADDED watch event for each
model_list = self._model.list_with_meta()
LOG.info("Got %d %s instances from quorum read", len(model_list.items), self._model.__name__)
for obj in model_list.items:
event = SyntheticAddedWatchEvent(obj)
# _should_yield is mainly called here to feed the self._seen cache
if self._should_yield(event):
yield event
# watch connection should start at the version of the initial list
last_seen_resource_version = model_list.metadata.resourceVersion
try:
for event in self._model.watch_list(
namespace=namespace, resource_version=last_seen_resource_version, allow_bookmarks=True
Expand Down
Loading

0 comments on commit 5316841

Please sign in to comment.