From 8195fd2d7a826d88b68be33aea6a8b6570d7c08b Mon Sep 17 00:00:00 2001 From: Per Johansson Date: Wed, 17 Jan 2024 11:31:47 +0100 Subject: [PATCH] Use bookmark events for better watch resumption during idle periods. It appears that the bookmark events are sent roughly once per minute, although the specification does not promise any particular interval. A timeout of 4.5 minutes still seem reasonable keeping that in mind. --- k8s/base.py | 95 ++++++++++++++++++++++++++++++++------- k8s/config.py | 10 +++-- k8s/watcher.py | 14 +++--- tests/k8s/test_base.py | 21 ++++++++- tests/k8s/test_watcher.py | 30 +++++++++++-- 5 files changed, 140 insertions(+), 30 deletions(-) diff --git a/k8s/base.py b/k8s/base.py index e92853d..dae0d5b 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -15,6 +15,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations +from abc import ABC + import json import logging from collections import namedtuple @@ -53,7 +56,7 @@ def __new__(mcs, cls, bases, attrs): "watch_list_url": getattr(attr_meta, "watch_list_url", ""), "watch_list_url_template": getattr(attr_meta, "watch_list_url_template", ""), "fields": [], - "field_names": [] + "field_names": [], } field_names = meta["field_names"] fields = meta["fields"] @@ -74,6 +77,7 @@ class ApiMixIn(object): Contains methods for working with the API """ + _client = Client() @classmethod @@ -118,15 +122,25 @@ def list(cls, namespace="default"): return [cls.from_dict(item) for item in resp.json()["items"]] @classmethod - def watch_list(cls, namespace=None, resource_version=None): - """Return a generator that yields WatchEvents of cls""" + def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False): + """Return a generator that yields WatchEvents of cls. + If allowBookmarks is True, WatchBookmarks will also be yielded. + It's recommended to use the Watcher class instead of calling this directly, + since it handles reconnects and resource versions. + """ url = cls._watch_list_url(namespace) + # We don't pass timeoutSeconds to the server, since our timeout is between each event, + # while the server will apply the timeout as a maximum time serving the full request, + # hanging up regardless of time between events. Let the server decide that timeout. params = {} if resource_version: # As per https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch # only resourceVersion is used for watch queries. params["resourceVersion"] = resource_version + LOG.info(f"Restarting %s watch at resource version %s", cls.__name__, resource_version) + if allow_bookmarks: + params["allowWatchBookmarks"] = "true" try: # The timeout here appears to be per call to the poll (or similar) system call, @@ -166,20 +180,26 @@ def _watch_list_url(cls, namespace): return url @classmethod - def _parse_watch_event(cls, line): + def _parse_watch_event(cls, line) -> WatchBaseEvent: """ - Parse a line from the watch stream into a WatchEvent. + Parse a line from the watch stream into a WatchEvent or WatchBookmark. Raises APIServerError if the line is an error event. """ try: event_json = json.loads(line) if APIServerError.match(event_json): + err = APIServerError(event_json["object"]) LOG.warning( "Received error event from API server: %s", - event_json["object"]["message"], + err, ) - raise APIServerError(event_json["object"]) - event = WatchEvent(event_json, cls) + raise err + if WatchBookmark.match(event_json): + LOG.debug("Received bookmark from API server: %s", event_json) + event = WatchBookmark(event_json) + else: + LOG.debug("Received watch event from API server: %s", event_json) + event = WatchEvent(event_json, cls) return event except TypeError: LOG.exception( @@ -248,7 +268,7 @@ def save_status(self): @staticmethod def _label_selector(labels): - """ Build a labelSelector string from a collection of key/values. The parameter can be either + """Build a labelSelector string from a collection of key/values. The parameter can be either a dict, or a list of (key, value) tuples (this allows for repeating a key). The keys/values are used to build the `labelSelector` parameter to the API, @@ -297,7 +317,8 @@ def __init__(self, new=True, **kwargs): field.default_value_create_instance = False if kwarg_names: raise TypeError( - "{}() got unexpected keyword-arguments: {}".format(self.__class__.__name__, ", ".join(kwarg_names))) + "{}() got unexpected keyword-arguments: {}".format(self.__class__.__name__, ", ".join(kwarg_names)) + ) if self._new: self._validate_fields() @@ -339,8 +360,10 @@ def from_dict(cls, d): return instance def __repr__(self): - return "{}({})".format(self.__class__.__name__, - ", ".join("{}={}".format(key, getattr(self, key)) for key in self._meta.field_names)) + return "{}({})".format( + self.__class__.__name__, + ", ".join("{}={}".format(key, getattr(self, key)) for key in self._meta.field_names), + ) def __eq__(self, other): try: @@ -353,22 +376,58 @@ def _api_name(name): return name[1:] if name.startswith("_") else name -class WatchEvent(object): +class WatchBaseEvent(ABC): + """Abstract base class for Watch events. + Contains the resource version of the event as property resource_version.""" + + __slots__ = ("resource_version",) + + def __init__(self, event_json): + self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") + + def __eq__(self, other): + return self.resource_version == other.resource_version + + def has_object(self): + ... + + +class WatchEvent(WatchBaseEvent): ADDED = "ADDED" MODIFIED = "MODIFIED" DELETED = "DELETED" def __init__(self, event_json, cls): + super(WatchEvent, self).__init__(event_json) self.type = event_json["type"] self.object = cls.from_dict(event_json["object"]) def __repr__(self): - return "{cls}(type={type}, object={object})".format(cls=self.__class__.__name__, type=self.type, - object=self.object) + return "{cls}(type={type}, object={object})".format( + cls=self.__class__.__name__, type=self.type, object=self.object + ) def __eq__(self, other): return self.type == other.type and self.object == other.object + def has_object(self): + return True + + +class WatchBookmark(WatchBaseEvent): + """Bookmark events, if enabled, are sent periodically by the API server. + They only contain the resourceVersion of the event.""" + + def __init__(self, event_json): + super(WatchBookmark, self).__init__(event_json) + + @classmethod + def match(cls, event_json): + return event_json["type"] == "BOOKMARK" + + def has_object(self): + return False + class LabelSelector(object): """Base for label select operations""" @@ -426,6 +485,7 @@ class MyModel(Model): submodel = Field(SelfModel) # submodel gets the type `MyModel` ``` """ + pass @@ -436,7 +496,10 @@ def __init__(self, api_error): self.api_error = api_error def __str__(self): - return self.api_error["message"] + code = self.api_error["code"] + reason = self.api_error.get("reason", "(unset)") + message = self.api_error.get("message", "(unset)") + return f"{code}: reason={reason} message={message}" @classmethod def match(cls, event_json): diff --git a/k8s/config.py b/k8s/config.py index 2725ecd..38a9018 100644 --- a/k8s/config.py +++ b/k8s/config.py @@ -43,7 +43,8 @@ #: * When connecting, a resourceVersion is used to resume, if still valid. This speaks for a low value, #: to avoid them expiring. #: * During idle periods, there might not be any new resourceVersions. -#: Therefore a full quorum read each time a timeout is reached. +#: Bookmarks events are used to avoid this, they are sent at a server +#: specific interval, but usually about once per minute. #: This speaks for a high value. #: 4.5 minutes is the default, set to detect the first case above in a reasonable time, #:while being just below the default resourceVersion expiration of 5 minutes. @@ -53,8 +54,10 @@ # disables bandit warning for this line which triggers because the string contains 'token', which is fine -def use_in_cluster_config(token_file="/var/run/secrets/kubernetes.io/serviceaccount/token", # nosec - ca_cert_file="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"): +def use_in_cluster_config( + token_file="/var/run/secrets/kubernetes.io/serviceaccount/token", # nosec + ca_cert_file="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", +): """ Configure the client using the recommended configuration for accessing the API from within a Kubernetes cluster: https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod @@ -72,6 +75,7 @@ class FileTokenSource(object): Intended to support the BoundServiceAccountTokenVolume feature in Kubernetes 1.21 and later. """ + def __init__(self, token_file, now_func=datetime.now): self._token_file = token_file self._expires_at = datetime(MINYEAR, 1, 1) # force read on initial call to _refresh_token diff --git a/k8s/watcher.py b/k8s/watcher.py index 2bbd68a..f00517e 100644 --- a/k8s/watcher.py +++ b/k8s/watcher.py @@ -27,11 +27,12 @@ class Watcher(object): """Higher-level interface to watch for changes in objects The low-level :py:meth:`~.watch_list` method will stop when the API-server drops the connection. - When reconnecting, the API-server will send a list of :py:const:`~k8s.base.WatchEvent.ADDED` + When reconnecting using that method, the API-server will send a list of :py:const:`~k8s.base.WatchEvent.ADDED` events for all objects, even if they have been seen before. The Watcher will hide this complexity for you, and make sure to reconnect when the connection drops, and skip events that have already been seen. + It additionally uses bookmarks to avoid the increased load that might be caused by reconnecting. :param Model model: The model class to watch :param int capacity: How many seen objects to keep track of @@ -55,15 +56,14 @@ def watch(self, namespace=None): while self._run_forever: try: for event in self._model.watch_list( - namespace=namespace, resource_version=last_seen_resource_version + namespace=namespace, resource_version=last_seen_resource_version, allow_bookmarks=True ): + last_seen_resource_version = event.resource_version + if not event.has_object(): + continue o = event.object key = (o.metadata.name, o.metadata.namespace) - last_seen_resource_version = o.metadata.resourceVersion - if ( - self._seen.get(key) == o.metadata.resourceVersion - and event.type != WatchEvent.DELETED - ): + if self._seen.get(key) == o.metadata.resourceVersion and event.type != WatchEvent.DELETED: continue self._seen[key] = o.metadata.resourceVersion yield event diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index c597afd..a14506a 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -20,7 +20,7 @@ import requests import requests.packages.urllib3 as urllib3 -from k8s.base import Model, Field, WatchEvent, Equality, Inequality, In, NotIn, Exists +from k8s.base import APIServerError, Equality, Exists, Field, In, Inequality, Model, NotIn, WatchBookmark, WatchEvent from k8s.models.common import DeleteOptions, Preconditions @@ -146,3 +146,22 @@ def test_watch_list_with_timeout(self, client): assert list(gen) == [] assert client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2 client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) + + def test_watch_list_api_error(self, client): + client.get.return_value.iter_lines.return_value = [ + '{"type": "ERROR", "object": {"kind":"Status", "code": 500, "message": "Internal Server Error"}}', + ] + gen = Example.watch_list() + with pytest.raises(APIServerError, match="Internal Server Error"): + next(gen) + + def test_watch_list_bookmark(self, client): + client.get.return_value.iter_lines.return_value = [ + '{"type":"BOOKMARK", "object":{"metadata":{"resourceVersion": 4712}}}', + ] + gen = Example.watch_list(resource_version=4711, allow_bookmarks=True) + assert next(gen) == WatchBookmark({"type": "BOOKMARK", "object": {"metadata": {"resourceVersion": 4712}}}) + assert list(gen) == [] + client.get.assert_called_once_with( + "/watch/example", stream=True, timeout=270, params={"resourceVersion": 4711, "allowWatchBookmarks": "true"} + ) diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index 1912fad..860cbbd 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -19,7 +19,7 @@ import mock import pytest -from k8s.base import APIServerError, Model, Field, WatchEvent +from k8s.base import APIServerError, Field, Model, WatchBookmark, WatchEvent from k8s.models.common import ObjectMeta from k8s.watcher import Watcher @@ -48,7 +48,7 @@ def test_multiple_events(self, api_watch_list): watcher._run_forever = False assert list(gen) == [] - api_watch_list.assert_called_with(namespace=None, resource_version=None) + api_watch_list.assert_called_with(namespace=None, resource_version=None, allow_bookmarks=True) def test_handle_reconnect(self, api_watch_list): events = [_event(0, ADDED, 1)] @@ -115,7 +115,7 @@ def stop_iteration(*args, **kwargs): assert list(gen) == [] - api_watch_list.assert_called_with(namespace=namespace, resource_version=None) + api_watch_list.assert_called_with(namespace=namespace, resource_version=None, allow_bookmarks=True) def test_handle_410(self, api_watch_list): watcher = Watcher(WatchListExample) @@ -134,6 +134,30 @@ def test_handle_410(self, api_watch_list): watcher._run_forever = False assert list(gen) == [] + def test_other_apierror(self, api_watch_list): + watcher = Watcher(WatchListExample) + + api_watch_list.side_effect = APIServerError({"code": 400, "message": "Bad Request"}) + with pytest.raises(APIServerError, match="Bad Request"): + next(watcher.watch()) + + def test_bookmark(self, api_watch_list): + watcher = Watcher(WatchListExample) + + api_watch_list.return_value.__getitem__.side_effect = [ + _event(0, ADDED, 1), + WatchBookmark({"object": {"metadata": {"resourceVersion": "2"}}}), + _event(1, MODIFIED, 3), + ] + # Seal the mock to make sure __iter__ is not used instead of __getitem__ + mock.seal(api_watch_list) + + gen = watcher.watch() + _assert_event(next(gen), 0, ADDED, 1) + _assert_event(next(gen), 1, MODIFIED, 3) + watcher._run_forever = False + assert list(gen) == [] + def _event(id, event_type, rv, namespace="default"): metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": rv}