-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a Watcher, with some high-level handling of watches
- Loading branch information
Showing
3 changed files
with
132 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 | ||
|
||
import cachetools | ||
|
||
from k8s.base import WatchEvent | ||
|
||
DEFAULT_CAPACITY = 1000 | ||
|
||
|
||
class Watcher(object): | ||
def __init__(self, model, capacity=DEFAULT_CAPACITY): | ||
self._seen = cachetools.LRUCache(capacity) | ||
self._model = model | ||
|
||
def watch(self): | ||
while True: | ||
for event in self._model.watch_list(): | ||
o = event.object | ||
key = (o.metadata.name, o.metadata.resourceVersion) | ||
if key in self._seen and event.type != WatchEvent.DELETED: | ||
continue | ||
self._seen[key] = True | ||
yield event |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 | ||
|
||
import mock | ||
import pytest | ||
import six | ||
|
||
from k8s.base import Model, Field, WatchEvent | ||
from k8s.models.common import ObjectMeta | ||
from k8s.watcher import Watcher | ||
|
||
# Just to make things shorter | ||
ADDED = WatchEvent.ADDED | ||
MODIFIED = WatchEvent.MODIFIED | ||
DELETED = WatchEvent.DELETED | ||
|
||
|
||
@pytest.mark.usefixtures("k8s_config", "logger") | ||
class TestWatcher(object): | ||
@pytest.fixture | ||
def api_watch_list(self): | ||
with mock.patch('k8s.base.ApiMixIn.watch_list') as m: | ||
yield m | ||
|
||
def test_multiple_events(self, api_watch_list): | ||
number_of_events = 20 | ||
events = [_event(i, ADDED, 1) for i in range(number_of_events)] | ||
api_watch_list.side_effect = [events] | ||
gen = Watcher(WatchListExample).watch() | ||
|
||
for i in range(number_of_events): | ||
_assert_event(next(gen), i, ADDED, 1) | ||
with pytest.raises(StopIteration): | ||
next(gen) | ||
|
||
def test_handle_reconnect(self, api_watch_list): | ||
events = [_event(0, ADDED, 1)] | ||
api_watch_list.side_effect = [events, events] | ||
gen = Watcher(WatchListExample).watch() | ||
|
||
_assert_event(next(gen), 0, ADDED, 1) | ||
with pytest.raises(StopIteration): | ||
next(gen) | ||
|
||
def test_handle_changes(self, api_watch_list): | ||
events = [_event(0, ADDED, 1), _event(0, MODIFIED, 2)] | ||
api_watch_list.side_effect = [events] | ||
gen = Watcher(WatchListExample).watch() | ||
|
||
_assert_event(next(gen), 0, ADDED, 1) | ||
_assert_event(next(gen), 0, MODIFIED, 2) | ||
|
||
with pytest.raises(StopIteration): | ||
next(gen) | ||
|
||
def test_complicated(self, api_watch_list): | ||
first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1)] | ||
second = [_event(0, ADDED, 1), _event(1, ADDED, 2), _event(2, ADDED, 1), _event(0, MODIFIED, 2)] | ||
third = [_event(0, ADDED, 2), _event(1, DELETED, 2), _event(2, ADDED, 1), _event(2, MODIFIED, 2)] | ||
api_watch_list.side_effect = [first, second, third] | ||
gen = Watcher(WatchListExample).watch() | ||
|
||
# First batch | ||
_assert_event(next(gen), 0, ADDED, 1) | ||
_assert_event(next(gen), 1, ADDED, 1) | ||
_assert_event(next(gen), 2, ADDED, 1) | ||
|
||
# Second batch | ||
_assert_event(next(gen), 1, ADDED, 2) | ||
_assert_event(next(gen), 0, MODIFIED, 2) | ||
|
||
# Third batch | ||
_assert_event(next(gen), 1, DELETED, 2) | ||
_assert_event(next(gen), 2, MODIFIED, 2) | ||
|
||
with pytest.raises(StopIteration): | ||
next(gen) | ||
|
||
|
||
def _event(id, event_type, rv): | ||
metadict = {"name": "name{}".format(id), "resourceVersion": rv} | ||
metadata = ObjectMeta.from_dict(metadict) | ||
wle = WatchListExample(metadata=metadata, value=(id * 100) + rv) | ||
return mock.NonCallableMagicMock(type=event_type, object=wle) | ||
|
||
|
||
def _assert_event(event, id, event_type, rv): | ||
assert event.type == event_type | ||
o = event.object | ||
assert o.kind == "Example" | ||
assert o.metadata.name == "name{}".format(id) | ||
assert o.value == (id * 100) + rv | ||
|
||
|
||
class WatchListExample(Model): | ||
class Meta: | ||
url_template = '/example' | ||
watch_list_url = '/watch/example' | ||
|
||
apiVersion = Field(six.text_type, "example.com/v1") | ||
kind = Field(six.text_type, "Example") | ||
metadata = Field(ObjectMeta) | ||
value = Field(int) | ||
|
||
|
||
class SentinelException(Exception): | ||
pass |