This repository has been archived by the owner on Sep 14, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 87
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Tests for daemons & timers (only smoke-tests, not much detailed)
- Loading branch information
Showing
9 changed files
with
886 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import asyncio | ||
import time | ||
import unittest.mock | ||
|
||
import freezegun | ||
import pytest | ||
|
||
import kopf | ||
from kopf.reactor.processing import process_resource_event | ||
from kopf.structs.bodies import RawBody | ||
from kopf.structs.containers import ResourceMemories | ||
|
||
|
||
class DaemonDummy: | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self.mock = unittest.mock.MagicMock() | ||
self.kwargs = {} | ||
self.steps = { | ||
'called': asyncio.Event(), | ||
'finish': asyncio.Event(), | ||
'error': asyncio.Event(), | ||
} | ||
|
||
async def wait_for_daemon_done(self): | ||
stopped = self.kwargs['stopped'] | ||
await stopped.wait() | ||
while not stopped._stopper.reason & stopped._stopper.reason.DONE: | ||
await asyncio.sleep(0) # give control back to asyncio event loop | ||
|
||
|
||
@pytest.fixture() | ||
def dummy(): | ||
return DaemonDummy() | ||
|
||
|
||
@pytest.fixture() | ||
def memories(): | ||
return ResourceMemories() | ||
|
||
|
||
@pytest.fixture() | ||
def simulate_cycle(k8s_mocked, registry, settings, resource, memories, mocker): | ||
""" | ||
Simulate K8s behaviour locally in memory (some meaningful approximation). | ||
""" | ||
|
||
def _merge_dicts(src, dst): | ||
for key, val in src.items(): | ||
if isinstance(val, dict) and key in dst: | ||
_merge_dicts(src[key], dst[key]) | ||
else: | ||
dst[key] = val | ||
|
||
async def _simulate_cycle(event_object: RawBody): | ||
mocker.resetall() | ||
|
||
await process_resource_event( | ||
lifecycle=kopf.lifecycles.all_at_once, | ||
registry=registry, | ||
settings=settings, | ||
resource=resource, | ||
memories=memories, | ||
raw_event={'type': 'irrelevant', 'object': event_object}, | ||
replenished=asyncio.Event(), | ||
event_queue=asyncio.Queue(), | ||
) | ||
|
||
# Do the same as k8s does: merge the patches into the object. | ||
for call in k8s_mocked.patch_obj.call_args_list: | ||
_merge_dicts(call[1]['patch'], event_object) | ||
|
||
return _simulate_cycle | ||
|
||
|
||
@pytest.fixture() | ||
def frozen_time(): | ||
""" | ||
A helper to simulate time movements to step over long sleeps/timeouts. | ||
""" | ||
# TODO LATER: Either freezegun should support the system clock, or find something else. | ||
with freezegun.freeze_time("2020-01-01 00:00:00") as frozen: | ||
# Use freezegun-supported time instead of system clocks -- for testing purposes only. | ||
# NB: Patch strictly after the time is frozen -- to use fake_time(), not real time(). | ||
with unittest.mock.patch('time.monotonic', time.time), \ | ||
unittest.mock.patch('time.perf_counter', time.time): | ||
yield frozen | ||
|
||
|
||
# The time-driven tests mock the sleeps, and shift the time as much as it was requested to sleep. | ||
# This makes the sleep realistic for the app code, though executed instantly for the tests. | ||
@pytest.fixture() | ||
def manual_time(k8s_mocked, frozen_time): | ||
async def sleep_or_wait_substitute(delay, *_, **__): | ||
if delay is None: | ||
pass | ||
elif isinstance(delay, float): | ||
frozen_time.tick(delay) | ||
else: | ||
frozen_time.tick(min(delay)) | ||
|
||
k8s_mocked.sleep_or_wait.side_effect = sleep_or_wait_substitute | ||
yield frozen_time | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
import logging | ||
|
||
import kopf | ||
from kopf.storage.finalizers import FINALIZER | ||
|
||
|
||
async def test_daemon_stopped_on_permanent_error( | ||
registry, resource, dummy, manual_time, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
backoff=0.01) | ||
async def fn(**kwargs): | ||
dummy.mock() | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
raise kopf.PermanentError("boo!") | ||
|
||
event_object = {'metadata': {'finalizers': [FINALIZER]}} | ||
await simulate_cycle(event_object) | ||
|
||
await dummy.steps['called'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert dummy.mock.call_count == 1 | ||
assert k8s_mocked.patch_obj.call_count == 0 | ||
assert k8s_mocked.sleep_or_wait.call_count == 1 # one for each retry | ||
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] is None | ||
|
||
assert_logs([ | ||
"Daemon 'fn' failed permanently: boo!", | ||
"Daemon 'fn' has exited on its own", | ||
], prohibited=[ | ||
"Daemon 'fn' succeeded.", | ||
]) | ||
|
||
|
||
async def test_daemon_stopped_on_arbitrary_errors_with_mode_permanent( | ||
registry, resource, dummy, manual_time, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
errors=kopf.ErrorsMode.PERMANENT, backoff=0.01) | ||
async def fn(**kwargs): | ||
dummy.mock() | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
raise Exception("boo!") | ||
|
||
event_object = {'metadata': {'finalizers': [FINALIZER]}} | ||
await simulate_cycle(event_object) | ||
|
||
await dummy.steps['called'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert dummy.mock.call_count == 1 | ||
assert k8s_mocked.sleep_or_wait.call_count == 1 # one for each retry | ||
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] is None | ||
|
||
assert_logs([ | ||
"Daemon 'fn' failed with an exception. Will stop.", | ||
"Daemon 'fn' has exited on its own", | ||
], prohibited=[ | ||
"Daemon 'fn' succeeded.", | ||
]) | ||
|
||
|
||
async def test_daemon_retried_on_temporary_error( | ||
registry, settings, resource, dummy, manual_time, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
backoff=1.0) | ||
async def fn(retry, **kwargs): | ||
dummy.mock() | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
if not retry: | ||
raise kopf.TemporaryError("boo!", delay=1.0) | ||
else: | ||
dummy.steps['finish'].set() | ||
|
||
event_object = {'metadata': {'finalizers': [FINALIZER]}} | ||
await simulate_cycle(event_object) | ||
|
||
await dummy.steps['called'].wait() | ||
await dummy.steps['finish'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert k8s_mocked.sleep_or_wait.call_count == 2 # one for each retry | ||
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] is None | ||
|
||
assert_logs([ | ||
"Daemon 'fn' failed temporarily: boo!", | ||
"Daemon 'fn' succeeded.", | ||
"Daemon 'fn' has exited on its own", | ||
]) | ||
|
||
|
||
async def test_daemon_retried_on_arbitrary_error_with_mode_temporary( | ||
registry, resource, dummy, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
errors=kopf.ErrorsMode.TEMPORARY, backoff=1.0) | ||
async def fn(retry, **kwargs): | ||
dummy.mock() | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
if not retry: | ||
raise Exception("boo!") | ||
else: | ||
dummy.steps['finish'].set() | ||
|
||
event_object = {'metadata': {'finalizers': [FINALIZER]}} | ||
await simulate_cycle(event_object) | ||
|
||
await dummy.steps['called'].wait() | ||
await dummy.steps['finish'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert k8s_mocked.sleep_or_wait.call_count == 2 # one for each retry | ||
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] is None | ||
|
||
assert_logs([ | ||
"Daemon 'fn' failed with an exception. Will retry.", | ||
"Daemon 'fn' succeeded.", | ||
"Daemon 'fn' has exited on its own", | ||
]) | ||
|
||
|
||
async def test_daemon_retried_until_retries_limit( | ||
registry, resource, dummy, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
retries=3) | ||
async def fn(**kwargs): | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
raise kopf.TemporaryError("boo!", delay=1.0) | ||
|
||
await simulate_cycle({}) | ||
await dummy.steps['called'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert k8s_mocked.sleep_or_wait.call_count == 4 # one for each retry | ||
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[2][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[3][0][0] is None | ||
|
||
|
||
async def test_daemon_retried_until_timeout( | ||
registry, resource, dummy, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
timeout=3.0) | ||
async def fn(**kwargs): | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
raise kopf.TemporaryError("boo!", delay=1.0) | ||
|
||
await simulate_cycle({}) | ||
await dummy.steps['called'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert k8s_mocked.sleep_or_wait.call_count == 4 # one for each retry | ||
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[2][0][0] == 1.0 # [call#][args/kwargs][arg#] | ||
assert k8s_mocked.sleep_or_wait.call_args_list[3][0][0] is None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import logging | ||
|
||
import pytest | ||
|
||
import kopf | ||
from kopf.storage.finalizers import FINALIZER | ||
|
||
|
||
# We assume that the handler filtering is tested in details elsewhere (for all handlers). | ||
# Here, we only test if it is applied or not applied. | ||
|
||
|
||
async def test_daemon_filtration_satisfied( | ||
registry, resource, dummy, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT}, | ||
annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT}) | ||
async def fn(**kwargs): | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
|
||
event_body = {'metadata': {'labels': {'a': 'value', 'b': '...'}, | ||
'annotations': {'x': 'value', 'y': '...'}, | ||
'finalizers': [FINALIZER]}} | ||
await simulate_cycle(event_body) | ||
|
||
await dummy.steps['called'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
|
||
@pytest.mark.parametrize('labels, annotations', [ | ||
# Annotations mismatching (but labels are matching): | ||
({'a': 'value', 'b': '...'}, {'x': 'mismatching-value', 'b': '...'}, ), # x must be "value". | ||
({'a': 'value', 'b': '...'}, {'x': 'value', 'y': '...', 'z': '...'}), # z must be absent | ||
({'a': 'value', 'b': '...'}, {'x': 'value'}), # y must be present | ||
# labels mismatching (but annotations are matching): | ||
({'a': 'mismatching-value', 'b': '...'}, {'x': 'value', 'y': '...'}), | ||
({'a': 'value', 'b': '...', 'c': '...'}, {'x': 'value', 'y': '...'}), | ||
({'a': 'value'}, {'x': 'value', 'y': '...'}), | ||
]) | ||
async def test_daemon_filtration_mismatched( | ||
registry, resource, mocker, labels, annotations, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle): | ||
caplog.set_level(logging.DEBUG) | ||
spawn_resource_daemons = mocker.patch('kopf.reactor.daemons.spawn_resource_daemons') | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT}, | ||
annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT}) | ||
async def fn(**kwargs): | ||
pass | ||
|
||
event_body = {'metadata': {'labels': labels, | ||
'annotations': annotations, | ||
'finalizers': [FINALIZER]}} | ||
await simulate_cycle(event_body) | ||
|
||
assert spawn_resource_daemons.called | ||
assert spawn_resource_daemons.call_args_list[0][1]['handlers'] == [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import logging | ||
|
||
import kopf | ||
|
||
|
||
async def test_daemon_is_spawned_at_least_once( | ||
registry, resource, dummy, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn') | ||
async def fn(**kwargs): | ||
dummy.mock() | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
|
||
await simulate_cycle({}) | ||
|
||
await dummy.steps['called'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert dummy.mock.call_count == 1 # not restarted | ||
|
||
|
||
async def test_daemon_initial_backoff_obeyed( | ||
registry, resource, dummy, | ||
caplog, assert_logs, k8s_mocked, simulate_cycle): | ||
caplog.set_level(logging.DEBUG) | ||
|
||
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn', | ||
initial_backoff=1.0) | ||
async def fn(**kwargs): | ||
dummy.mock() | ||
dummy.kwargs = kwargs | ||
dummy.steps['called'].set() | ||
|
||
await simulate_cycle({}) | ||
|
||
await dummy.steps['called'].wait() | ||
await dummy.wait_for_daemon_done() | ||
|
||
assert k8s_mocked.sleep_or_wait.call_count >= 1 | ||
assert k8s_mocked.sleep_or_wait.call_count <= 2 # one optional extra call for sleep(None) | ||
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] |
Oops, something went wrong.