Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Daemons & timers for background processing and reconciliation #330

Merged
merged 20 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5b662b5
Isolate sleep-or-wait to one module, relax the required arguments
nolar Mar 8, 2020
f7cdc68
Deprecate `cause=` kwarg in the handler functions
nolar Mar 8, 2020
b1470df
Return multiple delays from cause-processing routines
nolar Mar 8, 2020
1ca6792
Extract patch-and-sleep step to a separate reusable coroutine
nolar Mar 8, 2020
1671af2
Omit the patch-and-sleep step for the objects known to be gone
nolar Mar 12, 2020
8514b2d
Refactor cause detection and processing (keep the logic)
nolar Mar 8, 2020
7adba02
Move finalizer addition/removal to the top-level processing
nolar Mar 8, 2020
524b6ef
Customise how the handlers are self-represented in logs
nolar Mar 16, 2020
99a32d4
Add @kopf.daemon & @kopf.timer for background tasks
nolar Mar 8, 2020
26eef18
Explain different implementations of daemons termination methods
nolar Mar 30, 2020
41accd8
Document daemons & timers
nolar Mar 8, 2020
d4ac598
Document reconciliation recipes (overview with links)
nolar Mar 9, 2020
86e1734
Add examples of daemons & timers
nolar Mar 9, 2020
c44d1e6
Stop mocking the real sleep when it is switched to the sleeping engine
nolar Mar 18, 2020
2eb91dc
Tests for daemons & timers (only smoke-tests, not much detailed)
nolar Mar 30, 2020
11ddb76
Improve logging when a daemon exits gracefully
nolar Mar 31, 2020
7b3a8cc
Rename `initial_backoff` to `initial_delay` for clarify
nolar Mar 31, 2020
75e227f
Fix some wording and phrasing in the daemon docs (as per PR)
nolar Mar 31, 2020
78734f3
Document the daemons' executors/pools scaling for large clusters
nolar Mar 31, 2020
8748486
Fix few more wording & phrasing issues (as per PR)
nolar Mar 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
375 changes: 375 additions & 0 deletions docs/daemons.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,375 @@
=======
Daemons
=======

Daemons are a special type of handler for background logic that accompanies
the Kubernetes resources during their life cycle.

Unlike event-driven short-running handlers declared with ``@kopf.on``,
daemons are started for every individual object when it is created
(or when an operator is started/restarted while the object exists),
and are capable of running indefinitely (or infinitely) long.

The object's daemons are stopped when the object is deleted
or the whole operator is exiting/restarting.


Spawning
========

To have a daemon accompanying a resource of some kind, decorate a function
with ``@kopf.daemon`` and make it run for long time or forever:

.. code-block:: python

import asyncio
import time
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
async def monitor_kex_async(**kwargs):
while True:
... # check something
await asyncio.sleep(10)

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
def monitor_kex_sync(stopped, **kwargs):
while not stopped:
... # check something
time.sleep(10)

Synchronous functions are executed in threads, asynchronous functions are
executed directly in the asyncio event loop of the operator -- same as with
regular handlers, except that a separate thread pool is used for daemons
to prevent thread pool depletion for regular handlers.
See :doc:`async`.


Termination
===========

The daemons are terminated when either their resource is marked for deletion,
or the operator itself is exiting.

In both cases, the daemons are requested to terminate gracefully by setting
the :kwarg:`stopped` kwarg. The synchronous daemons MUST_, and asynchronous
daemons SHOULD_ check for the value of this flag as often as possible:

.. code-block:: python

import asyncio
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
def monitor_kex(stopped, **kwargs):
while not stopped:
time.sleep(1.0)
print("We are done. Bye.")

The asynchronous daemons can skip these checks if they define the cancellation
timeout (see below). In that case, they can expect an `asyncio.CancelledError`
to be raised at any point of their code (specifically, at any ``await`` clause):

.. code-block:: python

import asyncio
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples', cancellation_timeout=1.0)
async def monitor_kex(**kwargs):
try:
while True:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("We are done. Bye.")

With no cancellation timeout set, cancellation is not performed at all,
as it is unclear for how long should the coroutine be awaited. However,
it is cancelled when the operator exits and stops all "hung" left-over tasks
(not specifically daemons).

.. note::

The MUST_ / SHOULD_ separation is due to Python having no way to terminate
a thread unless the thread exits by its own. The :kwarg:`stopped` flag
is a way to signal the thread it should exit. If :kwarg:`stopped` is not
checked, the synchronous daemons will run forever or until an error happens.

.. _MUST: https://tools.ietf.org/rfc/rfc2119.txt
.. _SHOULD: https://tools.ietf.org/rfc/rfc2119.txt


Timeouts
========

The termination sequence parameters can be controlled when declaring a daemon:

.. code-block:: python

import asyncio
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples',
cancellation_backoff=1.0, cancellation_timeout=3.0)
async def monitor_kex(stopped, **kwargs):
while not stopped:
await asyncio.sleep(1)

There are three stages how the daemon is terminated:

* 1. Graceful termination:
* ``stopped`` is set immediately (unconditionally).
* ``cancellation_backoff`` is awaited (if set).
* 2. Forced termination -- only if ``cancellation_timeout`` is set:
* `asyncio.CancelledError` is raised (for async daemons only).
* ``cancellation_timeout`` is awaited (if set).
* 3a. Giving up and abandoning -- only if ``cancellation_timeout`` is set:
* A `ResourceWarning` is issued for potential OS resource leaks.
* The finalizer is removed, and the object is released for potential deletion.
* 3b. Forever polling -- only if ``cancellation_timeout`` is not set:
* The daemon awaiting continues forever, logging from time to time.
* The finalizer is not removed and the object remains blocked from deletion.

The ``cancellation_timeout`` is measured from the point when the daemon
is cancelled (forced termination begins), not from where the termination
itself begins; i.e., since the moment when the cancellation backoff is over.
The total termination time is ``cancellation_backoff + cancellation_timeout``.

.. warning::

When the operator is exiting, it has its own timeout of 5 seconds
for all "hung" tasks. This includes the daemons after they are requested
to exit gracefully and all timeouts are reached.

If the daemon termination takes longer than this for any reason,
the daemon will be cancelled (by the operator, not by the daemon guard)
regardless of the graceful timeout of the daemon. If this does not help,
the operator will be waiting for all hung tasks until SIGKILL'ed.

.. warning::

If the operator is running in a cluster, there can be timeouts set for a pod
(``terminationGracePeriodSeconds``, the default is 30 seconds).

If the daemon termination is longer than this timeout, the daemons will not
be finished in full at the operator exit, as the pod will be SIGKILL'ed.

Kopf itself does not set any implicit timeouts for the daemons.
Either design the daemons to exit as fast as possible, or configure
``terminationGracePeriodSeconds`` and cancellation timeouts accordingly.


Safe sleep
==========

For synchronous daemons, it is recommended to use ``stopped.wait()``
instead of ``time.sleep()``: the wait will end when either the time is reached
(as with the sleep), or immediately when the stopped flag is set:

.. code-block:: python

import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
def monitor_kex(stopped, **kwargs):
while not stopped:
stopped.wait(10)

For asynchronous handlers, regular ``asyncio.sleep()`` should be sufficient,
as it is cancellable via `asyncio.CancelledError`. If cancellation is neither
configured nor desired, ``stopped.wait()`` can be used too (with ``await``):

.. code-block:: python

import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
async def monitor_kex(stopped, **kwargs):
while not stopped:
await stopped.wait(10)

This way, the daemon will exit as soon as possible when the :kwarg:`stopped`
is set, not when the next sleep is over. Therefore, the sleeps can be of any
duration while the daemon remains terminable (leads to no OS resource leakage).

.. note::

Synchronous and asynchronous daemons get different types of stop-checker:
with synchronous and asynchronous interfaces respectively.
Therefore, they should be used accordingly: without or with ``await``.


Postponing
==========

Normally, daemons are spawned immediately once resource becomes visible
to the operator: i.e. on resource creation or operator startup.

It is possible to postpone the daemon spawning:

.. code-block:: python

import asyncio
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples', initial_delay=30)
async def monitor_kex(stopped, **kwargs):
while True:
await asyncio.sleep(1.0)


The start of the daemon will be delayed by 30 seconds after the resource
creation (or operator startup). For example, this can be used to give some time
for regular event-driven handlers to finish without producing too much activity.


Restarting
==========

It is generally expected that daemons are designed to run forever.
However, it is possible for a daemon to exit prematurely, i.e. before
the resource is deleted or the operator is exiting.

In that case, the daemon will not be restarted again during the lifecycle
of this resource in this operator process (however, it will be spawned again
if the operator restarts). This way, it becomes a long-running equivalent
of on-creation/on-resuming handlers.

To simulate restarting, raise `kopf.TemporaryError` with a delay set.

.. code-block:: python

import asyncio
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
async def monitor_kex(stopped, **kwargs):
await asyncio.sleep(10.0)
raise kopf.TemporaryError("Need to restart.", delay=10)

Same as with regular error handling, a delay of ``None`` means instant restart.


Deletion prevention
===================

Normally, a finalizer is put on the resource if there are daemons running
for it -- to prevent its actual deletion until all the daemons are terminated.

Only after the daemons are terminated, the finalizer is removed to release
the object for actual deletion.

However, it is possible to have daemons that disobey the exiting signals
and continue running after the timeouts. In that case, the finalizer is
anyway removed, and the orphaned daemons are left to themselves.


Resource fields access
======================

The resource's current state is accessible at any time through regular kwargs
(see :doc:`kwargs`): :kwarg:`body`, :kwarg:`spec`, :kwarg:`meta`,
:kwarg:`status`, :kwarg:`uid`, :kwarg:`name`, :kwarg:`namespace`, etc.

The values are "live views" of the current state of the object as it is being
modified during its lifecycle (not frozen as in the event-driven handlers):

.. code-block:: python

import random
import time
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
def monitor_kex(stopped, logger, body, spec, **kwargs):
while not stopped:
logger.info(f"FIELD={spec['field']}")
time.sleep(1)

@kopf.timer('zalando.org', 'v1', 'kopfexamples', interval=2.5)
def modify_kex_sometimes(patch, **kwargs):
patch.spec['field'] = random.randint(0, 100)

Always access the fields through the provided kwargs, and do not store
them in local variables. Internally, Kopf substitutes the whole object's
body on every external change. Storing the field values to the variables
will remember their value as it was at that moment in time,
and will not be updated as the object changes.


Results delivery
================

As with any other handlers, it is possible for the daemons to return
arbitrary JSON-serializable values to be put on the resource's status:

.. code-block:: python

import asyncio
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples')
async def monitor_kex(stopped, **kwargs):
await asyncio.sleep(10.0)
return {'finished': True}


Error handling
==============

The error handling is the same as for all other handlers: see :doc:`errors`:

.. code-block:: python

@kopf.daemon('zalando.org', 'v1', 'kopfexamples',
errors=kopf.ErrorsMode.TEMPORARY, backoff=1, retries=10)
def monitor_kex(retry, **_):
if retry < 3:
raise kopf.TemporaryError("I'll be back!", delay=1)
elif retry < 5:
raise EnvironmentError("Something happened!")
else:
raise kopf.PermanentError("Bye-bye!")

If a permanent error is raised, the daemon will never be restarted again.
Same as when the daemon exits on its own (but this could be reconsidered
in the future).


Filtering
=========

It is also possible to use the existing :doc:`filters`
to only spawn daemons for specific resources:

.. code-block:: python

import time
import kopf

@kopf.daemon('zalando.org', 'v1', 'kopfexamples',
annotations={'some-annotation': 'some-value'},
labels={'some-label': 'some-value'},
when=lambda name, **_: 'some' in name)
def monitor_selected_kexes(stopped, **kwargs):
while not stopped:
time.sleep(1)

Other (non-matching) resources of that kind will be ignored.


System resources
================

.. warning::

A separate OS thread or asyncio task is started
for each individual resource and each individual handler.

Having hundreds or thousands of OS threads or asyncio tasks can consume
system resources significantly. Make sure you only have daemons and timers
with appropriate filters (e.g., by labels, annotations, or so).

For the same reason, prefer to use async handlers (with properly designed
async/await code), since asyncio tasks are a somewhat cheaper than threads.
See :doc:`async` for details.
3 changes: 3 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Kopf: Kubernetes Operators Framework
:caption: Resource handling:

handlers
daemons
timers
kwargs
async
loading
Expand Down Expand Up @@ -62,6 +64,7 @@ Kopf: Kubernetes Operators Framework
deployment
continuity
idempotence
reconciliation
troubleshooting

.. toctree::
Expand Down
Loading