From 95bc40a0fa953bc746b5bf59bfc8a3ee824a6824 Mon Sep 17 00:00:00 2001 From: Greg Sterin Date: Sat, 19 Feb 2022 13:22:46 -0800 Subject: [PATCH] Allow not using finalizers for timer --- kopf/_core/intents/causes.py | 3 ++ kopf/_core/reactor/processing.py | 10 ++++-- kopf/on.py | 3 +- tests/handling/daemons/conftest.py | 6 ++-- .../handling/daemons/test_timer_triggering.py | 31 +++++++++++++++++++ 5 files changed, 47 insertions(+), 6 deletions(-) diff --git a/kopf/_core/intents/causes.py b/kopf/_core/intents/causes.py index 8933112e..295c38ba 100644 --- a/kopf/_core/intents/causes.py +++ b/kopf/_core/intents/causes.py @@ -193,6 +193,7 @@ class SpawningCause(ResourceCause): specific objects (loggers, etc). """ reset: bool + raw_event_type: str @property def _kwargs(self) -> Mapping[str, Any]: @@ -277,10 +278,12 @@ def detect_watching_cause( def detect_spawning_cause( body: bodies.Body, + raw_event_type: str, **kwargs: Any, ) -> SpawningCause: return SpawningCause( body=body, + raw_event_type=raw_event_type, **kwargs) diff --git a/kopf/_core/reactor/processing.py b/kopf/_core/reactor/processing.py index db6d8909..30e3cc0b 100644 --- a/kopf/_core/reactor/processing.py +++ b/kopf/_core/reactor/processing.py @@ -185,6 +185,7 @@ async def process_resource_causes( body=body, memo=memory.memo, reset=bool(diff), # only essential changes reset idling, not every event + raw_event_type=raw_event['type'], ) if registry._spawning.has_handlers(resource=resource) else None changing_cause = causes.detect_changing_cause( @@ -331,10 +332,15 @@ async def process_spawning_cause( if cause.reset: memory.daemons_memory.idle_reset_time = time.monotonic() - if finalizers.is_deletion_ongoing(cause.body): + daemons_to_stop = {} + for daemon_name, daemon in memory.daemons_memory.running_daemons.items(): + if finalizers.is_deletion_ongoing(cause.body) or (not daemon.handler.requires_finalizer and cause.raw_event_type == 'DELETED'): + daemons_to_stop[daemon_name] = daemon + + if daemons_to_stop: stopping_delays = await daemons.stop_daemons( settings=settings, - daemons=memory.daemons_memory.running_daemons, + daemons=daemons_to_stop, ) return stopping_delays diff --git a/kopf/on.py b/kopf/on.py index 694238ee..938e4845 100644 --- a/kopf/on.py +++ b/kopf/on.py @@ -735,6 +735,7 @@ def timer( # lgtm[py/similar-function] value: Optional[filters.ValueFilter] = None, # Operator specification: registry: Optional[registries.OperatorRegistry] = None, + requires_finalizer: bool = True, ) -> TimerDecorator: """ ``@kopf.timer()`` handler for the regular events. """ def decorator( # lgtm[py/similar-function] @@ -755,7 +756,7 @@ def decorator( # lgtm[py/similar-function] errors=errors, timeout=timeout, retries=retries, backoff=backoff, selector=selector, labels=labels, annotations=annotations, when=when, field=real_field, value=value, - initial_delay=initial_delay, requires_finalizer=True, + initial_delay=initial_delay, requires_finalizer=requires_finalizer, sharp=sharp, idle=idle, interval=interval, ) real_registry._spawning.append(handler) diff --git a/tests/handling/daemons/conftest.py b/tests/handling/daemons/conftest.py index 97dace52..f914d791 100644 --- a/tests/handling/daemons/conftest.py +++ b/tests/handling/daemons/conftest.py @@ -40,7 +40,7 @@ def dummy(): @pytest.fixture() -def simulate_cycle(k8s_mocked, registry, settings, resource, memories, mocker): +def simulate_cycle(k8s_mocked, registry, settings, resource, memories, mocker, raw_event=None): """ Simulate K8s behaviour locally in memory (some meaningful approximation). """ @@ -52,7 +52,7 @@ def _merge_dicts(src, dst): else: dst[key] = val - async def _simulate_cycle(event_object: RawBody): + async def _simulate_cycle(event_object: RawBody, raw_event_type: str = 'irrelevant'): mocker.resetall() await process_resource_event( @@ -63,7 +63,7 @@ async def _simulate_cycle(event_object: RawBody): memories=memories, memobase=Memo(), indexers=OperatorIndexers(), - raw_event={'type': 'irrelevant', 'object': event_object}, + raw_event={'type': raw_event_type, 'object': event_object}, event_queue=asyncio.Queue(), ) diff --git a/tests/handling/daemons/test_timer_triggering.py b/tests/handling/daemons/test_timer_triggering.py index ed074979..c818520b 100644 --- a/tests/handling/daemons/test_timer_triggering.py +++ b/tests/handling/daemons/test_timer_triggering.py @@ -2,6 +2,8 @@ import kopf +import asyncio + async def test_timer_is_spawned_at_least_once( resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): @@ -25,6 +27,35 @@ async def fn(**kwargs): await dummy.wait_for_daemon_done() +async def test_timer_stopped_on_deletion_event_nofinalizer( + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + caplog.set_level(logging.DEBUG) + + @kopf.timer(*resource, id='fn', interval=1.0, requires_finalizer=False) + async def fn(**kwargs): + dummy.mock() + dummy.kwargs = kwargs + + if dummy.mock.call_count >= 2: + dummy.steps['called'].set() + kwargs['stopped']._setter.set() # to exit the cycle + + await simulate_cycle({}) + await dummy.steps['called'].wait() + + assert dummy.mock.call_count == 2 + assert dummy.kwargs['retry'] == 0 + assert k8s_mocked.sleep.call_count == 2 + + # Send deleted event, wait for 2 seconds (2x interval time) + # to ensure no more calls. + # verify no additional calls to fn via mock.call_count + await simulate_cycle({}, raw_event_type='DELETED') + await asyncio.sleep(2.0) + assert dummy.mock.call_count == 2 + await dummy.wait_for_daemon_done() + + async def test_timer_initial_delay_obeyed( resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG)