Skip to content

Commit

Permalink
Merge pull request #124 from fiaas/watch-read-timeout
Browse files Browse the repository at this point in the history
Lower the stream timeout to detect control plan rotation faster.
  • Loading branch information
perj authored Jan 22, 2024
2 parents 9fb405d + d4d1730 commit c52bda4
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 55 deletions.
183 changes: 156 additions & 27 deletions k8s/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from abc import ABC

import json
import logging
from collections import namedtuple
from typing import Optional

import requests
import requests.packages.urllib3 as urllib3

from . import config
from .client import Client, NotFound
Expand Down Expand Up @@ -50,7 +57,7 @@ def __new__(mcs, cls, bases, attrs):
"watch_list_url": getattr(attr_meta, "watch_list_url", ""),
"watch_list_url_template": getattr(attr_meta, "watch_list_url_template", ""),
"fields": [],
"field_names": []
"field_names": [],
}
field_names = meta["field_names"]
fields = meta["fields"]
Expand All @@ -71,6 +78,7 @@ class ApiMixIn(object):
Contains methods for working with the API
"""

_client = Client()

@classmethod
Expand Down Expand Up @@ -100,7 +108,7 @@ def find(cls, name="", namespace="default", labels=None):
labels = {"app": Equality(name)}
selector = cls._label_selector(labels)
resp = cls._client.get(url, params={"labelSelector": selector})
return [cls.from_dict(item) for item in resp.json()[u"items"]]
return [cls.from_dict(item) for item in resp.json()["items"]]

@classmethod
def list(cls, namespace="default"):
Expand All @@ -112,36 +120,100 @@ def list(cls, namespace="default"):
else:
url = cls._build_url(name="", namespace=namespace)
resp = cls._client.get(url)
return [cls.from_dict(item) for item in resp.json()[u"items"]]
return [cls.from_dict(item) for item in resp.json()["items"]]

@classmethod
def watch_list(cls, namespace=None):
"""Return a generator that yields WatchEvents of cls"""
def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False):
"""Return a generator that yields WatchEvents of cls.
If allowBookmarks is True, WatchBookmarks will also be yielded.
It's recommended to use the Watcher class instead of calling this directly,
since it handles reconnects and resource versions.
"""
url = cls._watch_list_url(namespace)

# We don't pass timeoutSeconds to the server, since our timeout is between each event,
# while the server will apply the timeout as a maximum time serving the full request,
# hanging up regardless of time between events. Let the server decide that timeout.
params = {}
if resource_version:
# As per https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch
# only resourceVersion is used for watch queries.
params["resourceVersion"] = resource_version
LOG.info("Restarting %s watch at resource version %s", cls.__name__, resource_version)
if allow_bookmarks:
params["allowWatchBookmarks"] = "true"

try:
# The timeout here appears to be per call to the poll (or similar) system call,
# so each time data is received, the timeout will reset.
resp = cls._client.get(url, stream=True, timeout=config.stream_timeout, params=params)
for line in resp.iter_lines(chunk_size=None):
event = cls._parse_watch_event(line) if line else None
if event:
yield event
except requests.ConnectionError as e:
# ConnectionError is fairly generic, but check for ReadTimeoutError from urllib3.
# If we get this, there were no events received for the timeout period, which might not be an error,
# just a quiet period.
underlying = e.args[0]
if isinstance(underlying, urllib3.exceptions.ReadTimeoutError):
LOG.info(
"Read timeout while waiting for new %s events.",
cls.__name__,
)
return
raise

@classmethod
def _watch_list_url(cls, namespace):
"""Loads the optionally namespaced url from the class meta"""
if namespace:
if cls._meta.watch_list_url_template:
url = cls._meta.watch_list_url_template.format(namespace=namespace)
else:
raise NotImplementedError(
"Cannot watch_list with namespace, no watch_list_url_template defined on class {}".format(cls))
"Cannot watch_list with namespace, no watch_list_url_template defined on class {}".format(cls)
)
else:
url = cls._meta.watch_list_url
if not url:
raise NotImplementedError("Cannot watch_list, no watch_list_url defined on class {}".format(cls))
return url

resp = cls._client.get(url, stream=True, timeout=config.stream_timeout)
for line in resp.iter_lines(chunk_size=None):
if line:
try:
event_json = json.loads(line)
try:
event = WatchEvent(event_json, cls)
yield event
except TypeError:
LOG.exception(
"Unable to create instance of %s from watch event json, discarding event. event_json=%r",
cls.__name__, event_json)
except ValueError:
LOG.exception("Unable to parse JSON on watch event, discarding event. Line: %r", line)
@classmethod
def _parse_watch_event(cls, line) -> Optional[WatchBaseEvent]:
"""
Parse a line from the watch stream into a WatchEvent or WatchBookmark.
Raises APIServerError if the line is an error event.
"""
try:
event_json = json.loads(line)
if APIServerError.match(event_json):
err = APIServerError(event_json["object"])
LOG.warning(
"Received error event from API server: %s",
err,
)
raise err
if WatchBookmark.match(event_json):
LOG.debug("Received bookmark from API server: %s", event_json)
event = WatchBookmark(event_json)
else:
LOG.debug("Received watch event from API server: %s", event_json)
event = WatchEvent(event_json, cls)
return event
except TypeError:
LOG.exception(
"Unable to create instance of %s from watch event json, discarding event. event_json=%r",
cls.__name__,
event_json,
)
except ValueError:
LOG.exception(
"Unable to parse JSON on watch event, discarding event. Line: %r",
line,
)
return None

@classmethod
def get(cls, name, namespace="default"):
Expand Down Expand Up @@ -197,7 +269,7 @@ def save_status(self):

@staticmethod
def _label_selector(labels):
""" Build a labelSelector string from a collection of key/values. The parameter can be either
"""Build a labelSelector string from a collection of key/values. The parameter can be either
a dict, or a list of (key, value) tuples (this allows for repeating a key).
The keys/values are used to build the `labelSelector` parameter to the API,
Expand Down Expand Up @@ -246,7 +318,8 @@ def __init__(self, new=True, **kwargs):
field.default_value_create_instance = False
if kwarg_names:
raise TypeError(
"{}() got unexpected keyword-arguments: {}".format(self.__class__.__name__, ", ".join(kwarg_names)))
"{}() got unexpected keyword-arguments: {}".format(self.__class__.__name__, ", ".join(kwarg_names))
)
if self._new:
self._validate_fields()

Expand Down Expand Up @@ -288,8 +361,10 @@ def from_dict(cls, d):
return instance

def __repr__(self):
return "{}({})".format(self.__class__.__name__,
", ".join("{}={}".format(key, getattr(self, key)) for key in self._meta.field_names))
return "{}({})".format(
self.__class__.__name__,
", ".join("{}={}".format(key, getattr(self, key)) for key in self._meta.field_names),
)

def __eq__(self, other):
try:
Expand All @@ -302,22 +377,58 @@ def _api_name(name):
return name[1:] if name.startswith("_") else name


class WatchEvent(object):
class WatchBaseEvent(ABC):
"""Abstract base class for Watch events.
Contains the resource version of the event as property resource_version."""

__slots__ = ("resource_version",)

def __init__(self, event_json):
self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion")

def __eq__(self, other):
return self.resource_version == other.resource_version

def has_object(self):
...


class WatchEvent(WatchBaseEvent):
ADDED = "ADDED"
MODIFIED = "MODIFIED"
DELETED = "DELETED"

def __init__(self, event_json, cls):
super(WatchEvent, self).__init__(event_json)
self.type = event_json["type"]
self.object = cls.from_dict(event_json["object"])

def __repr__(self):
return "{cls}(type={type}, object={object})".format(cls=self.__class__.__name__, type=self.type,
object=self.object)
return "{cls}(type={type}, object={object})".format(
cls=self.__class__.__name__, type=self.type, object=self.object
)

def __eq__(self, other):
return self.type == other.type and self.object == other.object

def has_object(self):
return True


class WatchBookmark(WatchBaseEvent):
"""Bookmark events, if enabled, are sent periodically by the API server.
They only contain the resourceVersion of the event."""

def __init__(self, event_json):
super(WatchBookmark, self).__init__(event_json)

@classmethod
def match(cls, event_json):
return event_json["type"] == "BOOKMARK"

def has_object(self):
return False


class LabelSelector(object):
"""Base for label select operations"""
Expand Down Expand Up @@ -375,4 +486,22 @@ class MyModel(Model):
submodel = Field(SelfModel) # submodel gets the type `MyModel`
```
"""

pass


class APIServerError(Exception):
"""Raised when the API server returns an error event in the watch stream"""

def __init__(self, api_error):
self.api_error = api_error

def __str__(self):
code = self.api_error["code"]
reason = self.api_error.get("reason", "(unset)")
message = self.api_error.get("message", "(unset)")
return f"{code}: reason={reason} message={message}"

@classmethod
def match(cls, event_json):
return event_json["type"] == "ERROR" and event_json["object"].get("kind") == "Status"
25 changes: 20 additions & 5 deletions k8s/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,29 @@
debug = False
#: Default timeout for most operations
timeout = 20
#: Default timeout for streaming operations
stream_timeout = 3600
#: Default size of Watcher cache
#: Default timeout for streaming operations, used while waiting for more events.
#: When reached, the library will usually info log and reconnect.
#: There's a few considerations when setting this value:
#: * On some servers, it might take this long to detect a dropped connection. This speaks for a low value,
#: to detect the issue faster.
#: * When connecting, a resourceVersion is used to resume, if still valid. This speaks for a low value,
#: to avoid them expiring.
#: * During idle periods, there might not be any new resourceVersions.
#: Bookmarks events are used to avoid this, they are sent at a server
#: specific interval, but usually about once per minute.
#: This speaks for a high value.
#: 4.5 minutes is the default, set to detect the first case above in a reasonable time,
#:while being just below the default resourceVersion expiration of 5 minutes.
stream_timeout = 270
#: Default size of Watcher cache. If you expect a lot of events, you might want to increase this.
watcher_cache_size = 1000


# disables bandit warning for this line which triggers because the string contains 'token', which is fine
def use_in_cluster_config(token_file="/var/run/secrets/kubernetes.io/serviceaccount/token", # nosec
ca_cert_file="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"):
def use_in_cluster_config(
token_file="/var/run/secrets/kubernetes.io/serviceaccount/token", # nosec
ca_cert_file="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
):
"""
Configure the client using the recommended configuration for accessing the API from within a Kubernetes cluster:
https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod
Expand All @@ -61,6 +75,7 @@ class FileTokenSource(object):
Intended to support the BoundServiceAccountTokenVolume feature in Kubernetes 1.21 and later.
"""

def __init__(self, token_file, now_func=datetime.now):
self._token_file = token_file
self._expires_at = datetime(MINYEAR, 1, 1) # force read on initial call to _refresh_token
Expand Down
Loading

0 comments on commit c52bda4

Please sign in to comment.