Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Persist a state of handling progression in a customisable store
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar committed Mar 29, 2020
1 parent 1d045b8 commit f1fff56
Show file tree
Hide file tree
Showing 13 changed files with 871 additions and 149 deletions.
76 changes: 76 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,82 @@ or disconnects. The default is 0.1 seconds (nearly instant, but not flooding).
settings.watching.server_timeout = 10 * 60
.. _progress-storing:

Handling progress
=================

In order to keep the handling state across multiple handling cycles, and to be
resilient to errors and tolerable to restarts and downtimes, the operator keeps
its state in a configured state storage. See more in :doc:`continuity`.

To store the state only in the annotations with your own prefix:

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.persistence.storage = kopf.AnnotationsStateStorage(prefix='my-op.example.com')
To store the state only in the status or any other field:

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.persistence.storage = kopf.StatusStateStorage(field='status.my-operator')
To store in multiple places (stored in sync, but the first found state will be
used when fetching, i.e. the first storage has precedence):

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.persistence.storage = kopf.MultiStateStorage([
kopf.AnnotationsStateStorage(prefix='my-op.example.com'),
kopf.StatusStateStorage(field='status.my-operator'),
])
The default storage is at both annotations and status, with annotations having
precedence over the status (this is done as a transitioning solution
from status-only storage in the past to annotations-only storage in the future).
The annotations are ``kopf.zalando.org/{id}``,
the status fields are ``status.kopf.progress.{id}``.
It is an equivalent of:

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.persistence.storage = kopf.SmartStateStorage()
It is also possible to implement custom state storage instead of storing
the state directly in the resource's fields -- e.g., in external databases.
For this, inherit from `kopf.StateStorage`, and implement its abstract methods
(``fetch()``, ``store()``, ``purge()``, optionally ``flush()``).

.. note::

The legacy behavior is an equivalent of
``kopf.StatusStateStorage(field='status.kopf.progress')``.
However, the ``.status`` stanza is not always stored by the server
for built-in or improperly configured custom resources since Kubernetes 1.16
(see `#321 <https://github.com/zalando-incubator/kopf/issues/321>`_).

The new default "smart" engine is supposed to ensure a smooth upgrade
of Kopf-based operators to the new state location without special upgrade
actions or conversions needed.


Change detection
================

Expand Down
22 changes: 15 additions & 7 deletions docs/continuity.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@ Persistence
===========

Kopf does not have any database. It stores all the information directly
on the objects in the Kubernetes cluster (which means `etcd` usually).
All information is retrieved and stored via the Kubernetes API.
on the objects in the Kubernetes cluster (which means ``etcd`` usually).
All information is retrieved and stored via Kubernetes API.

Specifically:

* The cross-operator exchange is performed via the peering objects.
* The cross-operator exchange is performed via peering objects of type
``KopfPeering`` or ``ClusterKopfPeering`` (API version: ``zalando.org/v1``).
See :doc:`peering` for more info.
* The last handled state of the object is stored in ``metadata.annotations``.
* The last handled state of the object is stored in ``metadata.annotations``
(the ``kopf.zalando.org/last-handled-configuration`` annotation).
It is used to calculate diffs upon changes.
* The handler status (failures, successes, retries, delays) is stored
in ``status.kopf.progress`` (with ``status.kopf`` reserved for any
framework-related information in the future).
* The handlers' state (failures, successes, retries, delays) is stored
in either ``metadata.annotations`` (``kopf.zalando.org/{id}`` keys),
or in ``status.kopf.progress.{id}``, where ``{id}`` is the handler's id.

The persistent state locations can be configured to use different keys,
thus allowing multiple independent operators to handle the same resources
without overlapping with each other. The above-mentioned keys are the defaults.
See how to configure the stores in :doc:`configuration`
(at :ref:`progress-storing`, :ref:`diffbase-storing`).


Restarts
Expand Down
12 changes: 12 additions & 0 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@
StatusDiffBaseStorage,
MultiDiffBaseStorage,
)
from kopf.storage.progress import (
ProgressStorage,
AnnotationsProgressStorage,
StatusProgressStorage,
MultiProgressStorage,
SmartProgressStorage,
)
from kopf.structs.bodies import (
build_object_reference,
build_owner_reference,
Expand Down Expand Up @@ -137,4 +144,9 @@
'AnnotationsDiffBaseStorage',
'StatusDiffBaseStorage',
'MultiDiffBaseStorage',
'ProgressStorage',
'AnnotationsProgressStorage',
'StatusProgressStorage',
'MultiProgressStorage',
'SmartProgressStorage',
]
9 changes: 5 additions & 4 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,19 @@ async def execute(
"no practical use (there are no retries or state tracking).")

# Execute the real handlers (all or few or one of them, as per the lifecycle).
subsettings = subsettings_var.get()
settings: configuration.OperatorSettings = subsettings_var.get()
subhandlers = subregistry.get_handlers(cause=cause)
state = states.State.from_body(body=cause.body, handlers=subhandlers)
storage = settings.persistence.progress_storage
state = states.State.from_storage(body=cause.body, storage=storage, handlers=subhandlers)
outcomes = await execute_handlers_once(
lifecycle=lifecycle,
settings=subsettings,
settings=settings,
handlers=subhandlers,
cause=cause,
state=state,
)
state = state.with_outcomes(outcomes)
state.store(patch=cause.patch)
state.store(body=cause.body, patch=cause.patch, storage=storage)
states.deliver_results(outcomes=outcomes, patch=cause.patch)

# Escalate `HandlerChildrenRetry` if the execute should be continued on the next iteration.
Expand Down
9 changes: 6 additions & 3 deletions kopf/reactor/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ async def process_resource_event(
extra_fields = registry.resource_changing_handlers[resource].get_extra_fields()
old = settings.persistence.diffbase_storage.fetch(body=body)
new = settings.persistence.diffbase_storage.build(body=body, extra_fields=extra_fields)
old = settings.persistence.progress_storage.clear(essence=old) if old is not None else None
new = settings.persistence.progress_storage.clear(essence=new) if new is not None else None
diff = diffs.diff(old, new)
resource_changing_cause = causation.detect_resource_changing_cause(
raw_event=raw_event,
Expand Down Expand Up @@ -222,7 +224,8 @@ async def process_resource_changing_cause(
logger.debug(f"{title.capitalize()} diff: %r", cause.diff)

handlers = registry.resource_changing_handlers[cause.resource].get_handlers(cause=cause)
state = states.State.from_body(body=cause.body, handlers=handlers)
storage = settings.persistence.progress_storage
state = states.State.from_storage(body=cause.body, storage=storage, handlers=handlers)
if handlers:
outcomes = await handling.execute_handlers_once(
lifecycle=lifecycle,
Expand All @@ -232,12 +235,12 @@ async def process_resource_changing_cause(
state=state,
)
state = state.with_outcomes(outcomes)
state.store(patch=cause.patch)
state.store(body=cause.body, patch=cause.patch, storage=storage)
states.deliver_results(outcomes=outcomes, patch=cause.patch)

if state.done:
logger.info(f"All handlers succeeded for {title}.")
state.purge(patch=cause.patch, body=cause.body)
state.purge(body=cause.body, patch=cause.patch, storage=storage)

done = state.done
delay = state.delay
Expand Down
Loading

0 comments on commit f1fff56

Please sign in to comment.