diff --git a/k8s/base.py b/k8s/base.py index f3583ac..de6a995 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -21,7 +21,7 @@ import json import logging from collections import namedtuple -from typing import Optional +from typing import Optional, Dict, List import requests import requests.packages.urllib3 as urllib3 @@ -111,7 +111,7 @@ def find(cls, name="", namespace="default", labels=None): return [cls.from_dict(item) for item in resp.json()["items"]] @classmethod - def list(cls, namespace="default"): + def _list_raw(cls, namespace="default"): """List all resources in given namespace""" if namespace is None: if not cls._meta.list_url: @@ -120,8 +120,20 @@ def list(cls, namespace="default"): else: url = cls._build_url(name="", namespace=namespace) resp = cls._client.get(url) + return resp + + @classmethod + def list(cls, namespace="default"): + """List all resources in given namespace""" + resp = cls._list_raw(namespace=namespace) return [cls.from_dict(item) for item in resp.json()["items"]] + @classmethod + def list_with_meta(cls, namespace="default"): + """List all resources in given namespace. Return ModelList""" + resp = cls._list_raw(namespace=namespace) + return ModelList.from_dict(cls, resp.json()) + @classmethod def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False): """Return a generator that yields WatchEvents of cls. @@ -139,7 +151,7 @@ def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False # As per https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch # only resourceVersion is used for watch queries. params["resourceVersion"] = resource_version - LOG.info("Restarting %s watch at resource version %s", cls.__name__, resource_version) + LOG.info("(Re)starting %s watch at resource version %s", cls.__name__, resource_version) if allow_bookmarks: params["allowWatchBookmarks"] = "true" @@ -383,8 +395,11 @@ class WatchBaseEvent(ABC): __slots__ = ("resource_version",) - def __init__(self, event_json): - self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") + def __init__(self, event_json=None, resource_version=None): + if event_json is not None: + self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") + else: + self.resource_version = resource_version def __eq__(self, other): return self.resource_version == other.resource_version @@ -398,10 +413,21 @@ class WatchEvent(WatchBaseEvent): MODIFIED = "MODIFIED" DELETED = "DELETED" - def __init__(self, event_json, cls): - super(WatchEvent, self).__init__(event_json) - self.type = event_json["type"] - self.object = cls.from_dict(event_json["object"]) + def __init__(self, event_json: dict = None, cls: type[Model] = None, _type: str = None, _object: Model = None): + if event_json is not None and cls is not None: + super(WatchEvent, self).__init__(event_json=event_json) + self.type = event_json["type"] + self.object = cls.from_dict(event_json["object"]) + elif _type is not None and _object is not None: + # resource_version is effectively optional here to match the behavior for event_json in WatchBaseEvent + # in practice, watch events with None resourceVersion will break the caching in Watcher.watch() + resource_version = getattr(getattr(_object, "metadata", None), "resourceVersion", None) + super(WatchEvent, self).__init__(resource_version=resource_version) + self.type = _type + self.object = _object + else: + raise ValueError("requires either event_json and cls or _type and _object, " + + f"got {event_json=}, {cls=}, {_type=}, {_object=}") def __repr__(self): return "{cls}(type={type}, object={object})".format( @@ -415,6 +441,11 @@ def has_object(self): return True +class SyntheticAddedWatchEvent(WatchEvent): + def __init__(self, obj: Model): + super(SyntheticAddedWatchEvent, self).__init__(_type=WatchEvent.ADDED, _object=obj) + + class WatchBookmark(WatchBaseEvent): """Bookmark events, if enabled, are sent periodically by the API server. They only contain the resourceVersion of the event.""" @@ -505,3 +536,26 @@ def __str__(self): @classmethod def match(cls, event_json): return event_json["type"] == "ERROR" and event_json["object"].get("kind") == "Status" + + +class ListMeta(Model): + _continue = Field(str) + remainingItemCount = Field(int) + resourceVersion = Field(str) + + +class ModelList: + """ + Generic type to hold list of Model instances (items) together with ListMeta (metadata), + as returned by list API calls + """ + + def __init__(self, metadata: ListMeta, items: List[Model]): + self.metadata = metadata + self.items = items + + @classmethod + def from_dict(cls, model_cls: type[Model], list_response_data: Dict): + metadata = ListMeta.from_dict(list_response_data.get('metadata', {})) + items = [model_cls.from_dict(item) for item in list_response_data.get('items', [])] + return cls(metadata, items) diff --git a/k8s/watcher.py b/k8s/watcher.py index e81c3b5..cfc2d05 100644 --- a/k8s/watcher.py +++ b/k8s/watcher.py @@ -17,11 +17,14 @@ import cachetools +import logging -from .base import APIServerError, WatchEvent +from .base import APIServerError, WatchEvent, SyntheticAddedWatchEvent DEFAULT_CAPACITY = 1000 +LOG = logging.getLogger(__name__) + class Watcher(object): """Higher-level interface to watch for changes in objects @@ -54,6 +57,17 @@ def watch(self, namespace=None): # Only used on reconnects, the first call to watch does a quorum read. last_seen_resource_version = None while self._run_forever: + if last_seen_resource_version is None: + # list all resources and yield a synthetic ADDED watch event for each + model_list = self._model.list_with_meta() + LOG.info("Got %d %s instances from quorum read", len(model_list.items), self._model.__name__) + for obj in model_list.items: + event = SyntheticAddedWatchEvent(obj) + # _should_yield is mainly called here to feed the self._seen cache + if self._should_yield(event): + yield event + # watch connection should start at the version of the initial list + last_seen_resource_version = model_list.metadata.resourceVersion try: for event in self._model.watch_list( namespace=namespace, resource_version=last_seen_resource_version, allow_bookmarks=True diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index a14506a..e05a3ca 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -21,7 +21,8 @@ import requests.packages.urllib3 as urllib3 from k8s.base import APIServerError, Equality, Exists, Field, In, Inequality, Model, NotIn, WatchBookmark, WatchEvent -from k8s.models.common import DeleteOptions, Preconditions +from k8s.client import NotFound, ServerError, ClientError +from k8s.models.common import DeleteOptions, Preconditions, ObjectMeta class Example(Model): @@ -29,24 +30,77 @@ class Meta: url_template = '/example' watch_list_url = '/watch/example' + metadata = Field(ObjectMeta) value = Field(int) +def _example_object(value=42, resource_version="1"): + # Since metadata.resourceVersion is a ReadOnlyField values set are ignored. To avoid this we have to use from_dict + # to set the field value, like when deserializing an API response. + metadata = ObjectMeta.from_dict({"resourceVersion": resource_version}) + return Example(metadata=metadata, value=value) + + class TestWatchEvent(object): def test_watch_event_added(self): - watch_event = WatchEvent({"type": "ADDED", "object": {"value": 42}}, Example) + obj = _example_object(42, "1") + event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} + watch_event = WatchEvent(event_dict, Example) assert watch_event.type == WatchEvent.ADDED - assert watch_event.object == Example(value=42) + assert watch_event.object == obj def test_watch_event_modified(self): - watch_event = WatchEvent({"type": "MODIFIED", "object": {"value": 42}}, Example) + obj = _example_object(42, "1") + event_dict = {"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} + watch_event = WatchEvent(event_dict, Example) assert watch_event.type == WatchEvent.MODIFIED - assert watch_event.object == Example(value=42) + assert watch_event.object == obj def test_watch_event_deleted(self): - watch_event = WatchEvent({"type": "DELETED", "object": {"value": 42}}, Example) + obj = _example_object(42, "1") + event_dict = {"type": "DELETED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} + watch_event = WatchEvent(event_dict, Example) assert watch_event.type == WatchEvent.DELETED - assert watch_event.object == Example(value=42) + assert watch_event.object == obj + + @pytest.mark.parametrize( + "_type", + ( + WatchEvent.ADDED, + WatchEvent.MODIFIED, + WatchEvent.DELETED, + ), + ) + def test_watch_event_type_object(self, _type): + obj = _example_object(42, "1") + watch_event = WatchEvent(_type=_type, _object=obj) + assert watch_event.type == _type + assert watch_event.object == obj + + @pytest.mark.parametrize( + "kwargs", + ( + # invalid combinations of keyword arguments + dict( + event_json={"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}}, + _type=WatchEvent.MODIFIED, + ), + dict( + event_json={"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}}, + _object=_example_object(42, "1"), + ), + dict(cls=Example, _type=WatchEvent.MODIFIED), + dict(cls=Example, _object=_example_object(42, "1")), + # passed only one keyword argument, but a correct pair of arguments is required + dict(event_json={"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}}), + dict(_object=_example_object(42, "1")), + dict(_type=WatchEvent.MODIFIED), + dict(cls=Example), + ), + ) + def test_watch_event_invalid_params(self, kwargs): + with pytest.raises(ValueError): + WatchEvent(**kwargs) class TestFind(object): @@ -59,23 +113,23 @@ def test_find_by_name(self, client): Example.find("app_name") client.get.assert_called_once_with("/example", params={"labelSelector": "app=app_name"}) - @pytest.mark.parametrize("value, selector", ( - (Equality("my_value"), "my_key=my_value"), - (Inequality("my_value"), "my_key!=my_value"), - (In(("value1", "value2")), "my_key in (value1,value2)"), - (NotIn(("value1", "value2")), "my_key notin (value1,value2)"), - (Exists(), "my_key"), - ("my_unwrapped_value", "my_key=my_unwrapped_value"), - )) + @pytest.mark.parametrize( + "value, selector", + ( + (Equality("my_value"), "my_key=my_value"), + (Inequality("my_value"), "my_key!=my_value"), + (In(("value1", "value2")), "my_key in (value1,value2)"), + (NotIn(("value1", "value2")), "my_key notin (value1,value2)"), + (Exists(), "my_key"), + ("my_unwrapped_value", "my_key=my_unwrapped_value"), + ), + ) def test_find_by_selectors(self, client, value, selector): Example.find(labels={"my_key": value}) client.get.assert_called_once_with("/example", params={"labelSelector": selector}) def test_repeated_keys_in_label_selector(self, client): - labels = [ - ("foo", Inequality("bar")), - ("foo", Exists()) - ] + labels = [("foo", Inequality("bar")), ("foo", Exists())] Example.find(labels=labels) expected_selector = "foo!=bar,foo" @@ -84,7 +138,6 @@ def test_repeated_keys_in_label_selector(self, client): class TestDeleteList(object): - @pytest.fixture def client(self): with mock.patch.object(Example, "_client") as m: @@ -101,7 +154,7 @@ def test_delete_with_options(self, client): dryRun=[], gracePeriodSeconds=30, preconditions=Preconditions(uid="1234", resourceVersion="12"), - propagationPolicy="Foreground" + propagationPolicy="Foreground", ) Example.delete_list(labels={"foo": "bar"}, delete_options=opts) @@ -109,11 +162,8 @@ def test_delete_with_options(self, client): "apiVersion": "foo/v1", "dryRun": [], "gracePeriodSeconds": 30, - "preconditions": { - "uid": "1234", - "resourceVersion": "12" - }, - "propagationPolicy": "Foreground" + "preconditions": {"uid": "1234", "resourceVersion": "12"}, + "propagationPolicy": "Foreground", } client.delete.assert_called_once_with("/example", params={"labelSelector": "foo=bar"}, body=expected_body) @@ -126,23 +176,25 @@ def client(self): def test_watch_list(self, client): client.get.return_value.iter_lines.return_value = [ - '{"type": "ADDED", "object": {"value": 1}}', + '{"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}}', ] gen = Example.watch_list() - assert next(gen) == WatchEvent({"type": "ADDED", "object": {"value": 1}}, Example) + event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}} + assert next(gen) == WatchEvent(event_dict, Example) client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) assert list(gen) == [] def test_watch_list_with_timeout(self, client): client.get.return_value.iter_lines.return_value.__getitem__.side_effect = [ - '{"type": "ADDED", "object": {"value": 1}}', + '{"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}}', requests.ConnectionError(urllib3.exceptions.ReadTimeoutError("", "", "")), - '{"type": "MODIFIED", "object": {"value": 2}}', # Not reached + '{"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "2"}, "value": 2}}', # Not reached ] # Seal to avoid __iter__ being used instead of __getitem__ mock.seal(client) gen = Example.watch_list() - assert next(gen) == WatchEvent({"type": "ADDED", "object": {"value": 1}}, Example) + event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}} + assert next(gen) == WatchEvent(event_dict, Example) assert list(gen) == [] assert client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2 client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) @@ -165,3 +217,106 @@ def test_watch_list_bookmark(self, client): client.get.assert_called_once_with( "/watch/example", stream=True, timeout=270, params={"resourceVersion": 4711, "allowWatchBookmarks": "true"} ) + + +class TestList: + @pytest.fixture + def response(self): + data = { + "metadata": { + "resourceVersion": "1", + "continue": "ENCODED_CONTINUE_TOKEN", + "remainingItemCount": 1, + }, + "items": [ + {"value": 42}, + {"value": 1337}, + ], + } + resp = mock.create_autospec(requests.Response, spec_set=True) + resp.json.return_value = data + yield resp + + @pytest.fixture + def response_empty(self): + data = { + "metadata": { + "resourceVersion": "2", + }, + "items": [], + } + resp = mock.create_autospec(requests.Response, spec_set=True) + resp.json.return_value = data + yield resp + + @pytest.fixture + def client(self): + with mock.patch.object(Example, "_client") as m: + yield m + + def test_list(self, client, response): + client.get.return_value = response + + expected = [ + Example(value=42), + Example(value=1337), + ] + assert Example.list() == expected + + def test_list_with_meta(self, client, response): + client.get.return_value = response + + expected_items = [ + Example(value=42), + Example(value=1337), + ] + + actual = Example.list_with_meta() + + assert actual.metadata.resourceVersion == "1" + assert actual.metadata._continue == "ENCODED_CONTINUE_TOKEN" + assert actual.metadata.remainingItemCount == 1 + assert actual.items == expected_items + + def test_list_empty(self, client, response_empty): + client.get.return_value = response_empty + + assert Example.list() == [] + + def test_list_with_meta_empty(self, client, response_empty): + client.get.return_value = response_empty + + actual = Example.list_with_meta() + + assert actual.metadata.resourceVersion == "2" + assert actual.metadata._continue is None + assert actual.metadata.remainingItemCount is None + assert actual.items == [] + + @pytest.mark.parametrize( + "exception", + ( + NotFound, + ClientError, + ServerError, + ), + ) + def test_list_error(self, client, exception): + client.get.side_effect = exception + + with pytest.raises(exception): + Example.list() + + @pytest.mark.parametrize( + "exception", + ( + NotFound, + ClientError, + ServerError, + ), + ) + def test_list_with_meta_error(self, client, exception): + client.get.side_effect = exception + + with pytest.raises(exception): + Example.list_with_meta() diff --git a/tests/k8s/test_client.py b/tests/k8s/test_client.py index 55d0aeb..307462e 100644 --- a/tests/k8s/test_client.py +++ b/tests/k8s/test_client.py @@ -223,8 +223,8 @@ def test_watch_list_payload_ok(self, get): get.return_value = response expected = [ - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), - _create_watchevent(WatchEvent.MODIFIED, WatchListExample(value=3, requiredValue=4)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=1, requiredValue=2)), + WatchEvent(_type=WatchEvent.MODIFIED, _object=WatchListExample(value=3, requiredValue=4)), ] items = list(WatchListExample.watch_list()) @@ -257,9 +257,9 @@ def test_watch_list_payload_invalid_json(self, get): get.return_value = response expected = [ - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=1, requiredValue=2)), # "definitely not valid json" should be discarded - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=5, requiredValue=6)), ] items = list(WatchListExample.watch_list()) @@ -297,21 +297,15 @@ def test_watch_list_payload_invalid_object(self, get): get.return_value = response expected = [ - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=1, requiredValue=2)), # event with value=10 and requiredValue missing should be discarded - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=5, requiredValue=6)), ] items = list(WatchListExample.watch_list()) assert items == expected -def _create_watchevent(event_type, event_object): - """factory function for WatchEvent to make it easier to create test data from actual objects, as the constructor - takes a dict (unmarshaled json)""" - return WatchEvent({"type": event_type, "object": event_object.as_dict()}, event_object.__class__) - - def _absolute_url(url): return config.api_server + url diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index 860cbbd..6fcd73c 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -19,7 +19,7 @@ import mock import pytest -from k8s.base import APIServerError, Field, Model, WatchBookmark, WatchEvent +from k8s.base import APIServerError, Field, Model, WatchBookmark, WatchEvent, ModelList, ListMeta from k8s.models.common import ObjectMeta from k8s.watcher import Watcher @@ -29,38 +29,165 @@ DELETED = WatchEvent.DELETED +class WatchListExample(Model): + class Meta: + url_template = '/example' + watch_list_url = '/watch/example' + watch_list_url_template = '/watch/{namespace}/example' + + apiVersion = Field(str, "example.com/v1") + kind = Field(str, "Example") + metadata = Field(ObjectMeta) + value = Field(int) + + +def _example_resource(_id, rv, namespace="default"): + metadict = {"name": "name{}".format(_id), "namespace": namespace, "resourceVersion": str(rv)} + metadata = ObjectMeta.from_dict(metadict) + return WatchListExample(metadata=metadata, value=(_id * 100) + rv) + + +def _event(_id, event_type, rv, namespace="default"): + wle = _example_resource(_id, rv, namespace) + return WatchEvent(_type=event_type, _object=wle) + + +def _assert_event(event, _id, event_type, rv, namespace="default"): + assert event.type == event_type + o = event.object + assert o.kind == "Example" + assert o.metadata.name == "name{}".format(_id) + assert o.metadata.namespace == namespace + assert o.value == (_id * 100) + rv + + @pytest.mark.usefixtures("k8s_config", "logger") class TestWatcher(object): + @pytest.fixture + def api_list_with_meta(self): + with mock.patch("k8s.base.ApiMixIn.list_with_meta") as m: + yield m + @pytest.fixture def api_watch_list(self): with mock.patch("k8s.base.ApiMixIn.watch_list") as m: yield m - def test_multiple_events(self, api_watch_list): - number_of_events = 20 - events = [_event(i, ADDED, 1) for i in range(number_of_events)] + @pytest.mark.parametrize( + 'initial_resources,list_resource_version,events', + ( + # 20 initial resources, then 20 watch events + ([_example_resource(i, 100 + i) for i in range(20)], "200", [_event(i, ADDED, 300 + i) for i in range(20)]), + # 20 initial resources, no watch events + ([_example_resource(i, 100 + i) for i in range(20)], "200", []), + # no initial resources, 20 watch events + ([], "1", [_event(i, ADDED, 300 + i) for i in range(20)]), + ), + ) + def test_multiple_events( + self, api_watch_list, api_list_with_meta, initial_resources, list_resource_version, events + ): + model_list = ModelList(metadata=ListMeta(resourceVersion=list_resource_version), items=initial_resources) + api_list_with_meta.return_value = model_list api_watch_list.side_effect = [events] + watcher = Watcher(WatchListExample) gen = watcher.watch() - for i in range(number_of_events): - _assert_event(next(gen), i, ADDED, 1) + # verify that the initial resources are yielded by the watcher first + for i in range(len(initial_resources)): + _assert_event(next(gen), i, ADDED, 100 + i) + + # verify that the events from the watch_list call are yielded by the watcher + for i in range(len(events)): + _assert_event(next(gen), i, ADDED, 300 + i) + + # stop the watcher loop and verify that there are no more events watcher._run_forever = False assert list(gen) == [] - api_watch_list.assert_called_with(namespace=None, resource_version=None, allow_bookmarks=True) + api_list_with_meta.assert_called_with() + # verify watch_list was called with resourceVersion returned by list call + api_watch_list.assert_called_with(namespace=None, resource_version=list_resource_version, allow_bookmarks=True) + + def test_no_events(self, api_watch_list, api_list_with_meta): + list_resource_version = "1" + model_list = ModelList(metadata=ListMeta(resourceVersion=list_resource_version), items=[]) + api_list_with_meta.return_value = model_list + + def stop_iteration(*args, **kwargs): + watcher._run_forever = False + return [] + + api_watch_list.side_effect = stop_iteration + + watcher = Watcher(WatchListExample) + gen = watcher.watch() + + assert list(gen) == [] + + api_list_with_meta.assert_called_with() + # verify watch_list was called with resourceVersion returned by list call + api_watch_list.assert_called_with(namespace=None, resource_version=list_resource_version, allow_bookmarks=True) + + def test_handle_watcher_cache_watch(self, api_watch_list, api_list_with_meta): + # if the same event (same name, namespace and resource version) is returned by watch_list multiple times, it + # should only be yielded once. If a DELETED event is received with the same resourceVersion, it should be + # yielded. + # If a DELETED event is received with the same resourceVersion as a previous event, it should be yielded. + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=[]) + api_list_with_meta.return_value = model_list + + # yield event with resource twice, and stop the watcher after yielding the second event + event = _event(0, ADDED, 1) + delete_event = _event(0, DELETED, 1) + + def side_effect(*args, **kwargs): + yield event + yield event + yield delete_event + watcher._run_forever = False + + api_watch_list.side_effect = side_effect - def test_handle_reconnect(self, api_watch_list): - events = [_event(0, ADDED, 1)] - api_watch_list.side_effect = [events, events] watcher = Watcher(WatchListExample) gen = watcher.watch() _assert_event(next(gen), 0, ADDED, 1) - watcher._run_forever = False + _assert_event(next(gen), 0, DELETED, 1) assert list(gen) == [] - def test_handle_changes(self, api_watch_list): + def test_handle_watcher_cache_list(self, api_watch_list, api_list_with_meta): + # if the same event (same name, namespace and resource version) is returned by list and watch_list multiple + # times, it should only be yielded once. + # If a DELETED event is received with the same resourceVersion as a previous event, it should be yielded. + resource = _example_resource(0, 1) + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=[resource]) + api_list_with_meta.return_value = model_list + + # yield event twice, and stop the watcher after yielding the second event + event = WatchEvent(_type=WatchEvent.ADDED, _object=resource) + delete_event = WatchEvent(_type=WatchEvent.DELETED, _object=resource) + + def side_effect(*args, **kwargs): + yield event + yield event + yield delete_event + watcher._run_forever = False + + api_watch_list.side_effect = side_effect + + watcher = Watcher(WatchListExample) + gen = watcher.watch() + + _assert_event(next(gen), 0, ADDED, 1) + _assert_event(next(gen), 0, DELETED, 1) + assert list(gen) == [] + + def test_handle_changes(self, api_watch_list, api_list_with_meta): + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=[]) + api_list_with_meta.return_value = model_list + events = [_event(0, ADDED, 1), _event(0, MODIFIED, 2)] api_watch_list.side_effect = [events] watcher = Watcher(WatchListExample) @@ -72,7 +199,11 @@ def test_handle_changes(self, api_watch_list): watcher._run_forever = False assert list(gen) == [] - def test_complicated(self, api_watch_list): + def test_complicated(self, api_watch_list, api_list_with_meta): + initial_resources = [_example_resource(1, 0), _example_resource(2, 0)] + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=initial_resources) + api_list_with_meta.return_value = model_list + first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1)] second = [_event(0, ADDED, 1), _event(1, ADDED, 2), _event(2, ADDED, 1), _event(0, MODIFIED, 2)] third = [_event(0, ADDED, 2), _event(1, DELETED, 2), _event(2, ADDED, 1), _event(2, MODIFIED, 2)] @@ -81,30 +212,36 @@ def test_complicated(self, api_watch_list): watcher = Watcher(WatchListExample) gen = watcher.watch() - # First batch + # Synthetic added events for the initial resources + _assert_event(next(gen), 1, ADDED, 0) + _assert_event(next(gen), 2, ADDED, 0) + + # First batch of events _assert_event(next(gen), 0, ADDED, 1) _assert_event(next(gen), 1, ADDED, 1) _assert_event(next(gen), 2, ADDED, 1) - # Second batch + # Second batch of events _assert_event(next(gen), 1, ADDED, 2) _assert_event(next(gen), 0, MODIFIED, 2) - # Third batch + # Third batch of events _assert_event(next(gen), 1, DELETED, 2) _assert_event(next(gen), 2, MODIFIED, 2) - # Fourth batch + # Fourth batch of events _assert_event(next(gen), 0, ADDED, 1, "other") _assert_event(next(gen), 0, MODIFIED, 2, "other") watcher._run_forever = False assert list(gen) == [] - def test_namespace(self, api_watch_list): + def test_namespace(self, api_watch_list, api_list_with_meta): namespace = "the-namespace" watcher = Watcher(WatchListExample) + api_list_with_meta.return_value = ModelList(metadata=ListMeta(), items=[]) + def stop_iteration(*args, **kwargs): watcher._run_forever = False return [] @@ -115,33 +252,90 @@ def stop_iteration(*args, **kwargs): assert list(gen) == [] + api_list_with_meta.assert_called_with() api_watch_list.assert_called_with(namespace=namespace, resource_version=None, allow_bookmarks=True) - def test_handle_410(self, api_watch_list): + def test_handle_410_list(self, api_watch_list, api_list_with_meta): + # the initial list call should not receive 410, since it doesn't send a resourceversion. If it does, something + # is probably wrong, and the exception should be propagated to the caller. + api_list_with_meta.side_effect = APIServerError({"code": 410, "message": "Gone"}) + watcher = Watcher(WatchListExample) + with pytest.raises(APIServerError, match="Gone"): + next(watcher.watch()) + + def test_handle_410_watch(self, api_watch_list, api_list_with_meta): + # 410 response can occur if watch connection starts with a too old resourceVersion + # - this can happen on reconnect if last_seen_resource_version is too old, for example if the apiserver + # doesn't send bookmark events, or sends them at intervals longer than the client watch timeout + # (k8s.config.stream_timeout, default 4.5 minutes) + # - in theory the resourceVersion returned by the list call could be too old when the watch connection starts + # if it takes too long to yield syntetic added watch events for the items returned by the list call. + # How long this takes depends on the consumer of the generator returned by watcher.watch(). If this happens, + # the watcher will do another quorum read. Since there is a cache of seen items in the watcher, as long as + # all items fit in the cache, the number of events yielded for items from the list call approach zero, + # eventually allowing the watch connection to start. + watcher = Watcher(WatchListExample) + + first_list_resource_version = "1" + second_list_resource_version = "4" + api_list_with_meta.side_effect = [ + ModelList(metadata=ListMeta(resourceVersion=first_list_resource_version), items=[_example_resource(0, 0)]), + ModelList(metadata=ListMeta(resourceVersion=second_list_resource_version), items=[_example_resource(0, 0)]), + ] api_watch_list.return_value.__getitem__.side_effect = [ - _event(0, ADDED, 1), + _event(1, ADDED, 2), APIServerError({"code": 410, "message": "Gone"}), - _event(0, MODIFIED, 2), + _event(1, MODIFIED, 3), ] # Seal the mock to make sure __iter__ is not used instead of __getitem__ mock.seal(api_watch_list) gen = watcher.watch() - _assert_event(next(gen), 0, ADDED, 1) - _assert_event(next(gen), 0, MODIFIED, 2) + + # synthetic added event for initial resource and added event + _assert_event(next(gen), 0, ADDED, 0) + _assert_event(next(gen), 1, ADDED, 2) + api_list_with_meta.assert_called_once() + api_watch_list.assert_called_once_with( + namespace=None, resource_version=first_list_resource_version, allow_bookmarks=True + ) + + # next will raise 410 from watch_list, call list and watch_list again, then yield the last event + _assert_event(next(gen), 1, MODIFIED, 3) + # verify list and watch_list has now been called twice, and each call of watch_list used the resourceVersion + # returned by the preceding list call + assert api_list_with_meta.call_args_list == [mock.call(), mock.call()] + assert api_watch_list.call_args_list == [ + mock.call(namespace=None, resource_version=first_list_resource_version, allow_bookmarks=True), + mock.call(namespace=None, resource_version=second_list_resource_version, allow_bookmarks=True), + ] + + # no more events watcher._run_forever = False assert list(gen) == [] - def test_other_apierror(self, api_watch_list): + def test_other_apierror_list(self, api_list_with_meta): + watcher = Watcher(WatchListExample) + + api_list_with_meta.side_effect = APIServerError({"code": 400, "message": "Bad Request"}) + + with pytest.raises(APIServerError, match="Bad Request"): + next(watcher.watch()) + + def test_other_apierror_watch(self, api_watch_list, api_list_with_meta): watcher = Watcher(WatchListExample) + api_list_with_meta.return_value = ModelList(metadata=ListMeta(), items=[]) api_watch_list.side_effect = APIServerError({"code": 400, "message": "Bad Request"}) with pytest.raises(APIServerError, match="Bad Request"): next(watcher.watch()) - def test_bookmark(self, api_watch_list): + def test_bookmark(self, api_watch_list, api_list_with_meta): + watcher = Watcher(WatchListExample) + api_list_with_meta.return_value = ModelList(metadata=ListMeta(), items=[]) + watcher = Watcher(WatchListExample) api_watch_list.return_value.__getitem__.side_effect = [ @@ -157,35 +351,3 @@ def test_bookmark(self, api_watch_list): _assert_event(next(gen), 1, MODIFIED, 3) watcher._run_forever = False assert list(gen) == [] - - -def _event(id, event_type, rv, namespace="default"): - metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": rv} - metadata = ObjectMeta.from_dict(metadict) - wle = WatchListExample(metadata=metadata, value=(id * 100) + rv) - return mock.NonCallableMagicMock(type=event_type, object=wle) - - -def _assert_event(event, id, event_type, rv, namespace="default"): - assert event.type == event_type - o = event.object - assert o.kind == "Example" - assert o.metadata.name == "name{}".format(id) - assert o.metadata.namespace == namespace - assert o.value == (id * 100) + rv - - -class WatchListExample(Model): - class Meta: - url_template = '/example' - watch_list_url = '/watch/example' - watch_list_url_template = '/watch/{namespace}/example' - - apiVersion = Field(str, "example.com/v1") - kind = Field(str, "Example") - metadata = Field(ObjectMeta) - value = Field(int) - - -class SentinelException(Exception): - pass