Skip to content

Commit

Permalink
Use bookmark events for better watch resumption during idle periods.
Browse files Browse the repository at this point in the history
It appears that the bookmark events are sent roughly once per minute,
although the specification does not promise any particular interval.
A timeout of 4.5 minutes still seem reasonable keeping that in mind.
  • Loading branch information
perj committed Jan 17, 2024
1 parent a98aa84 commit 8195fd2
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 30 deletions.
95 changes: 79 additions & 16 deletions k8s/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from abc import ABC

import json
import logging
from collections import namedtuple
Expand Down Expand Up @@ -53,7 +56,7 @@ def __new__(mcs, cls, bases, attrs):
"watch_list_url": getattr(attr_meta, "watch_list_url", ""),
"watch_list_url_template": getattr(attr_meta, "watch_list_url_template", ""),
"fields": [],
"field_names": []
"field_names": [],
}
field_names = meta["field_names"]
fields = meta["fields"]
Expand All @@ -74,6 +77,7 @@ class ApiMixIn(object):
Contains methods for working with the API
"""

_client = Client()

@classmethod
Expand Down Expand Up @@ -118,15 +122,25 @@ def list(cls, namespace="default"):
return [cls.from_dict(item) for item in resp.json()["items"]]

@classmethod
def watch_list(cls, namespace=None, resource_version=None):
"""Return a generator that yields WatchEvents of cls"""
def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False):
"""Return a generator that yields WatchEvents of cls.
If allowBookmarks is True, WatchBookmarks will also be yielded.
It's recommended to use the Watcher class instead of calling this directly,
since it handles reconnects and resource versions.
"""
url = cls._watch_list_url(namespace)

# We don't pass timeoutSeconds to the server, since our timeout is between each event,
# while the server will apply the timeout as a maximum time serving the full request,
# hanging up regardless of time between events. Let the server decide that timeout.
params = {}
if resource_version:
# As per https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch
# only resourceVersion is used for watch queries.
params["resourceVersion"] = resource_version
LOG.info(f"Restarting %s watch at resource version %s", cls.__name__, resource_version)
if allow_bookmarks:
params["allowWatchBookmarks"] = "true"

try:
# The timeout here appears to be per call to the poll (or similar) system call,
Expand Down Expand Up @@ -166,20 +180,26 @@ def _watch_list_url(cls, namespace):
return url

@classmethod
def _parse_watch_event(cls, line):
def _parse_watch_event(cls, line) -> WatchBaseEvent:
"""
Parse a line from the watch stream into a WatchEvent.
Parse a line from the watch stream into a WatchEvent or WatchBookmark.
Raises APIServerError if the line is an error event.
"""
try:
event_json = json.loads(line)
if APIServerError.match(event_json):
err = APIServerError(event_json["object"])
LOG.warning(
"Received error event from API server: %s",
event_json["object"]["message"],
err,
)
raise APIServerError(event_json["object"])
event = WatchEvent(event_json, cls)
raise err
if WatchBookmark.match(event_json):
LOG.debug("Received bookmark from API server: %s", event_json)
event = WatchBookmark(event_json)
else:
LOG.debug("Received watch event from API server: %s", event_json)
event = WatchEvent(event_json, cls)
return event
except TypeError:
LOG.exception(
Expand Down Expand Up @@ -248,7 +268,7 @@ def save_status(self):

@staticmethod
def _label_selector(labels):
""" Build a labelSelector string from a collection of key/values. The parameter can be either
"""Build a labelSelector string from a collection of key/values. The parameter can be either
a dict, or a list of (key, value) tuples (this allows for repeating a key).
The keys/values are used to build the `labelSelector` parameter to the API,
Expand Down Expand Up @@ -297,7 +317,8 @@ def __init__(self, new=True, **kwargs):
field.default_value_create_instance = False
if kwarg_names:
raise TypeError(
"{}() got unexpected keyword-arguments: {}".format(self.__class__.__name__, ", ".join(kwarg_names)))
"{}() got unexpected keyword-arguments: {}".format(self.__class__.__name__, ", ".join(kwarg_names))
)
if self._new:
self._validate_fields()

Expand Down Expand Up @@ -339,8 +360,10 @@ def from_dict(cls, d):
return instance

def __repr__(self):
return "{}({})".format(self.__class__.__name__,
", ".join("{}={}".format(key, getattr(self, key)) for key in self._meta.field_names))
return "{}({})".format(
self.__class__.__name__,
", ".join("{}={}".format(key, getattr(self, key)) for key in self._meta.field_names),
)

def __eq__(self, other):
try:
Expand All @@ -353,22 +376,58 @@ def _api_name(name):
return name[1:] if name.startswith("_") else name


class WatchEvent(object):
class WatchBaseEvent(ABC):
"""Abstract base class for Watch events.
Contains the resource version of the event as property resource_version."""

__slots__ = ("resource_version",)

def __init__(self, event_json):
self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion")

def __eq__(self, other):
return self.resource_version == other.resource_version

def has_object(self):
...


class WatchEvent(WatchBaseEvent):
ADDED = "ADDED"
MODIFIED = "MODIFIED"
DELETED = "DELETED"

def __init__(self, event_json, cls):
super(WatchEvent, self).__init__(event_json)
self.type = event_json["type"]
self.object = cls.from_dict(event_json["object"])

def __repr__(self):
return "{cls}(type={type}, object={object})".format(cls=self.__class__.__name__, type=self.type,
object=self.object)
return "{cls}(type={type}, object={object})".format(
cls=self.__class__.__name__, type=self.type, object=self.object
)

def __eq__(self, other):
return self.type == other.type and self.object == other.object

def has_object(self):
return True


class WatchBookmark(WatchBaseEvent):
"""Bookmark events, if enabled, are sent periodically by the API server.
They only contain the resourceVersion of the event."""

def __init__(self, event_json):
super(WatchBookmark, self).__init__(event_json)

@classmethod
def match(cls, event_json):
return event_json["type"] == "BOOKMARK"

def has_object(self):
return False


class LabelSelector(object):
"""Base for label select operations"""
Expand Down Expand Up @@ -426,6 +485,7 @@ class MyModel(Model):
submodel = Field(SelfModel) # submodel gets the type `MyModel`
```
"""

pass


Expand All @@ -436,7 +496,10 @@ def __init__(self, api_error):
self.api_error = api_error

def __str__(self):
return self.api_error["message"]
code = self.api_error["code"]
reason = self.api_error.get("reason", "(unset)")
message = self.api_error.get("message", "(unset)")
return f"{code}: reason={reason} message={message}"

@classmethod
def match(cls, event_json):
Expand Down
10 changes: 7 additions & 3 deletions k8s/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
#: * When connecting, a resourceVersion is used to resume, if still valid. This speaks for a low value,
#: to avoid them expiring.
#: * During idle periods, there might not be any new resourceVersions.
#: Therefore a full quorum read each time a timeout is reached.
#: Bookmarks events are used to avoid this, they are sent at a server
#: specific interval, but usually about once per minute.
#: This speaks for a high value.
#: 4.5 minutes is the default, set to detect the first case above in a reasonable time,
#:while being just below the default resourceVersion expiration of 5 minutes.
Expand All @@ -53,8 +54,10 @@


# disables bandit warning for this line which triggers because the string contains 'token', which is fine
def use_in_cluster_config(token_file="/var/run/secrets/kubernetes.io/serviceaccount/token", # nosec
ca_cert_file="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"):
def use_in_cluster_config(
token_file="/var/run/secrets/kubernetes.io/serviceaccount/token", # nosec
ca_cert_file="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
):
"""
Configure the client using the recommended configuration for accessing the API from within a Kubernetes cluster:
https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod
Expand All @@ -72,6 +75,7 @@ class FileTokenSource(object):
Intended to support the BoundServiceAccountTokenVolume feature in Kubernetes 1.21 and later.
"""

def __init__(self, token_file, now_func=datetime.now):
self._token_file = token_file
self._expires_at = datetime(MINYEAR, 1, 1) # force read on initial call to _refresh_token
Expand Down
14 changes: 7 additions & 7 deletions k8s/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ class Watcher(object):
"""Higher-level interface to watch for changes in objects
The low-level :py:meth:`~.watch_list` method will stop when the API-server drops the connection.
When reconnecting, the API-server will send a list of :py:const:`~k8s.base.WatchEvent.ADDED`
When reconnecting using that method, the API-server will send a list of :py:const:`~k8s.base.WatchEvent.ADDED`
events for all objects, even if they have been seen before.
The Watcher will hide this complexity for you, and make sure to reconnect when the
connection drops, and skip events that have already been seen.
It additionally uses bookmarks to avoid the increased load that might be caused by reconnecting.
:param Model model: The model class to watch
:param int capacity: How many seen objects to keep track of
Expand All @@ -55,15 +56,14 @@ def watch(self, namespace=None):
while self._run_forever:
try:
for event in self._model.watch_list(
namespace=namespace, resource_version=last_seen_resource_version
namespace=namespace, resource_version=last_seen_resource_version, allow_bookmarks=True
):
last_seen_resource_version = event.resource_version
if not event.has_object():
continue
o = event.object
key = (o.metadata.name, o.metadata.namespace)
last_seen_resource_version = o.metadata.resourceVersion
if (
self._seen.get(key) == o.metadata.resourceVersion
and event.type != WatchEvent.DELETED
):
if self._seen.get(key) == o.metadata.resourceVersion and event.type != WatchEvent.DELETED:
continue
self._seen[key] = o.metadata.resourceVersion
yield event
Expand Down
21 changes: 20 additions & 1 deletion tests/k8s/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import requests
import requests.packages.urllib3 as urllib3

from k8s.base import Model, Field, WatchEvent, Equality, Inequality, In, NotIn, Exists
from k8s.base import APIServerError, Equality, Exists, Field, In, Inequality, Model, NotIn, WatchBookmark, WatchEvent
from k8s.models.common import DeleteOptions, Preconditions


Expand Down Expand Up @@ -146,3 +146,22 @@ def test_watch_list_with_timeout(self, client):
assert list(gen) == []
assert client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2
client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={})

def test_watch_list_api_error(self, client):
client.get.return_value.iter_lines.return_value = [
'{"type": "ERROR", "object": {"kind":"Status", "code": 500, "message": "Internal Server Error"}}',
]
gen = Example.watch_list()
with pytest.raises(APIServerError, match="Internal Server Error"):
next(gen)

def test_watch_list_bookmark(self, client):
client.get.return_value.iter_lines.return_value = [
'{"type":"BOOKMARK", "object":{"metadata":{"resourceVersion": 4712}}}',
]
gen = Example.watch_list(resource_version=4711, allow_bookmarks=True)
assert next(gen) == WatchBookmark({"type": "BOOKMARK", "object": {"metadata": {"resourceVersion": 4712}}})
assert list(gen) == []
client.get.assert_called_once_with(
"/watch/example", stream=True, timeout=270, params={"resourceVersion": 4711, "allowWatchBookmarks": "true"}
)
30 changes: 27 additions & 3 deletions tests/k8s/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import mock
import pytest

from k8s.base import APIServerError, Model, Field, WatchEvent
from k8s.base import APIServerError, Field, Model, WatchBookmark, WatchEvent
from k8s.models.common import ObjectMeta
from k8s.watcher import Watcher

Expand Down Expand Up @@ -48,7 +48,7 @@ def test_multiple_events(self, api_watch_list):
watcher._run_forever = False
assert list(gen) == []

api_watch_list.assert_called_with(namespace=None, resource_version=None)
api_watch_list.assert_called_with(namespace=None, resource_version=None, allow_bookmarks=True)

def test_handle_reconnect(self, api_watch_list):
events = [_event(0, ADDED, 1)]
Expand Down Expand Up @@ -115,7 +115,7 @@ def stop_iteration(*args, **kwargs):

assert list(gen) == []

api_watch_list.assert_called_with(namespace=namespace, resource_version=None)
api_watch_list.assert_called_with(namespace=namespace, resource_version=None, allow_bookmarks=True)

def test_handle_410(self, api_watch_list):
watcher = Watcher(WatchListExample)
Expand All @@ -134,6 +134,30 @@ def test_handle_410(self, api_watch_list):
watcher._run_forever = False
assert list(gen) == []

def test_other_apierror(self, api_watch_list):
watcher = Watcher(WatchListExample)

api_watch_list.side_effect = APIServerError({"code": 400, "message": "Bad Request"})
with pytest.raises(APIServerError, match="Bad Request"):
next(watcher.watch())

def test_bookmark(self, api_watch_list):
watcher = Watcher(WatchListExample)

api_watch_list.return_value.__getitem__.side_effect = [
_event(0, ADDED, 1),
WatchBookmark({"object": {"metadata": {"resourceVersion": "2"}}}),
_event(1, MODIFIED, 3),
]
# Seal the mock to make sure __iter__ is not used instead of __getitem__
mock.seal(api_watch_list)

gen = watcher.watch()
_assert_event(next(gen), 0, ADDED, 1)
_assert_event(next(gen), 1, MODIFIED, 3)
watcher._run_forever = False
assert list(gen) == []


def _event(id, event_type, rv, namespace="default"):
metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": rv}
Expand Down

0 comments on commit 8195fd2

Please sign in to comment.