Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow not using finalizers for timer #896

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kopf/_core/intents/causes.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class SpawningCause(ResourceCause):
specific objects (loggers, etc).
"""
reset: bool
raw_event_type: str

@property
def _kwargs(self) -> Mapping[str, Any]:
Expand Down Expand Up @@ -277,10 +278,12 @@ def detect_watching_cause(

def detect_spawning_cause(
body: bodies.Body,
raw_event_type: str,
**kwargs: Any,
) -> SpawningCause:
return SpawningCause(
body=body,
raw_event_type=raw_event_type,
**kwargs)


Expand Down
10 changes: 8 additions & 2 deletions kopf/_core/reactor/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async def process_resource_causes(
body=body,
memo=memory.memo,
reset=bool(diff), # only essential changes reset idling, not every event
raw_event_type=raw_event['type'],
) if registry._spawning.has_handlers(resource=resource) else None

changing_cause = causes.detect_changing_cause(
Expand Down Expand Up @@ -331,10 +332,15 @@ async def process_spawning_cause(
if cause.reset:
memory.daemons_memory.idle_reset_time = time.monotonic()

if finalizers.is_deletion_ongoing(cause.body):
daemons_to_stop = {}
for daemon_name, daemon in memory.daemons_memory.running_daemons.items():
if finalizers.is_deletion_ongoing(cause.body) or (not daemon.handler.requires_finalizer and cause.raw_event_type == 'DELETED'):
daemons_to_stop[daemon_name] = daemon

if daemons_to_stop:
stopping_delays = await daemons.stop_daemons(
settings=settings,
daemons=memory.daemons_memory.running_daemons,
daemons=daemons_to_stop,
)
return stopping_delays

Expand Down
3 changes: 2 additions & 1 deletion kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ def timer( # lgtm[py/similar-function]
value: Optional[filters.ValueFilter] = None,
# Operator specification:
registry: Optional[registries.OperatorRegistry] = None,
requires_finalizer: bool = True,
) -> TimerDecorator:
""" ``@kopf.timer()`` handler for the regular events. """
def decorator( # lgtm[py/similar-function]
Expand All @@ -755,7 +756,7 @@ def decorator( # lgtm[py/similar-function]
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value,
initial_delay=initial_delay, requires_finalizer=True,
initial_delay=initial_delay, requires_finalizer=requires_finalizer,
sharp=sharp, idle=idle, interval=interval,
)
real_registry._spawning.append(handler)
Expand Down
6 changes: 3 additions & 3 deletions tests/handling/daemons/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def dummy():


@pytest.fixture()
def simulate_cycle(k8s_mocked, registry, settings, resource, memories, mocker):
def simulate_cycle(k8s_mocked, registry, settings, resource, memories, mocker, raw_event=None):
"""
Simulate K8s behaviour locally in memory (some meaningful approximation).
"""
Expand All @@ -52,7 +52,7 @@ def _merge_dicts(src, dst):
else:
dst[key] = val

async def _simulate_cycle(event_object: RawBody):
async def _simulate_cycle(event_object: RawBody, raw_event_type: str = 'irrelevant'):
mocker.resetall()

await process_resource_event(
Expand All @@ -63,7 +63,7 @@ async def _simulate_cycle(event_object: RawBody):
memories=memories,
memobase=Memo(),
indexers=OperatorIndexers(),
raw_event={'type': 'irrelevant', 'object': event_object},
raw_event={'type': raw_event_type, 'object': event_object},
event_queue=asyncio.Queue(),
)

Expand Down
31 changes: 31 additions & 0 deletions tests/handling/daemons/test_timer_triggering.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import kopf

import asyncio


async def test_timer_is_spawned_at_least_once(
resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle):
Expand All @@ -25,6 +27,35 @@ async def fn(**kwargs):
await dummy.wait_for_daemon_done()


async def test_timer_stopped_on_deletion_event_nofinalizer(
resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)

@kopf.timer(*resource, id='fn', interval=1.0, requires_finalizer=False)
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs

if dummy.mock.call_count >= 2:
dummy.steps['called'].set()
kwargs['stopped']._setter.set() # to exit the cycle

await simulate_cycle({})
await dummy.steps['called'].wait()

assert dummy.mock.call_count == 2
assert dummy.kwargs['retry'] == 0
assert k8s_mocked.sleep.call_count == 2

# Send deleted event, wait for 2 seconds (2x interval time)
# to ensure no more calls.
# verify no additional calls to fn via mock.call_count
await simulate_cycle({}, raw_event_type='DELETED')
await asyncio.sleep(2.0)
assert dummy.mock.call_count == 2
await dummy.wait_for_daemon_done()


async def test_timer_initial_delay_obeyed(
resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)
Expand Down