Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Move handler structures to kopf.structs package #332

Merged
merged 4 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
from kopf.reactor import (
lifecycles, # as a separate name on the public namespace
)
from kopf.reactor.errors import (
ErrorsMode,
)
from kopf.reactor.handling import (
TemporaryError,
PermanentError,
Expand Down Expand Up @@ -73,6 +70,9 @@
ABSENT,
PRESENT,
)
from kopf.structs.handlers import (
ErrorsMode,
)
from kopf.toolkits.hierarchies import (
adopt,
label,
Expand Down
5 changes: 2 additions & 3 deletions kopf/engines/probing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import aiohttp.web

from kopf.reactor import activities
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import lifecycles
from kopf.reactor import registries
from kopf.structs import callbacks
from kopf.structs import handlers

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,7 +54,7 @@ async def get_health(
activity_results = await activities.run_activity(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.PROBE,
activity=handlers.Activity.PROBE,
)
probing_container.clear()
probing_container.update(activity_results)
Expand Down
40 changes: 19 additions & 21 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ def creation_handler(**kwargs):

from typing import Optional, Callable

from kopf.reactor import causation
from kopf.reactor import errors as errors_
from kopf.reactor import handlers
from kopf.reactor import handling
from kopf.reactor import registries
from kopf.structs import callbacks
from kopf.structs import dicts
from kopf.structs import filters
from kopf.structs import handlers
from kopf.structs import resources

ActivityDecorator = Callable[[callbacks.ActivityFn], callbacks.ActivityFn]
Expand All @@ -33,7 +31,7 @@ def creation_handler(**kwargs):
def startup( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -46,7 +44,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.STARTUP,
activity=handlers.Activity.STARTUP,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -56,7 +54,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
def cleanup( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -69,7 +67,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.CLEANUP,
activity=handlers.Activity.CLEANUP,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -79,7 +77,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
def login( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -93,7 +91,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.AUTHENTICATION,
activity=handlers.Activity.AUTHENTICATION,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -103,7 +101,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
def probe( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -117,7 +115,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.PROBE,
activity=handlers.Activity.PROBE,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -128,7 +126,7 @@ def resume( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -161,7 +159,7 @@ def create( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -182,7 +180,7 @@ def decorator(fn: callbacks.ResourceChangingFn) -> callbacks.ResourceChangingFn:
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
labels=labels, annotations=annotations, when=when,
initial=None, deleted=None, requires_finalizer=None,
reason=causation.Reason.CREATE,
reason=handlers.Reason.CREATE,
)
real_registry.resource_changing_handlers[real_resource].append(handler)
return fn
Expand All @@ -193,7 +191,7 @@ def update( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -214,7 +212,7 @@ def decorator(fn: callbacks.ResourceChangingFn) -> callbacks.ResourceChangingFn:
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
labels=labels, annotations=annotations, when=when,
initial=None, deleted=None, requires_finalizer=None,
reason=causation.Reason.UPDATE,
reason=handlers.Reason.UPDATE,
)
real_registry.resource_changing_handlers[real_resource].append(handler)
return fn
Expand All @@ -225,7 +223,7 @@ def delete( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -247,7 +245,7 @@ def decorator(fn: callbacks.ResourceChangingFn) -> callbacks.ResourceChangingFn:
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
labels=labels, annotations=annotations, when=when,
initial=None, deleted=None, requires_finalizer=bool(not optional),
reason=causation.Reason.DELETE,
reason=handlers.Reason.DELETE,
)
real_registry.resource_changing_handlers[real_resource].append(handler)
return fn
Expand All @@ -259,7 +257,7 @@ def field( # lgtm[py/similar-function]
field: dicts.FieldSpec,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -318,7 +316,7 @@ def decorator(fn: callbacks.ResourceWatchingFn) -> callbacks.ResourceWatchingFn:
def this( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -379,7 +377,7 @@ def register( # lgtm[py/similar-function]
fn: callbacks.ResourceChangingFn,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down
34 changes: 23 additions & 11 deletions kopf/reactor/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
* Specific authentication methods, such as the authentication piggybacking,
belong to neither the reactor, nor the engines, nor the client wrappers.
"""
import asyncio
import logging
from typing import NoReturn, Mapping
from typing import NoReturn, Mapping, MutableMapping

from kopf.engines import sleeping
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import handling
from kopf.reactor import lifecycles
from kopf.reactor import registries
from kopf.reactor import states
from kopf.structs import callbacks
from kopf.structs import credentials
from kopf.structs import handlers as handlers_

logger = logging.getLogger(__name__)

Expand All @@ -38,7 +40,7 @@ def __init__(
self,
msg: str,
*,
outcomes: Mapping[handlers.HandlerId, states.HandlerOutcome],
outcomes: Mapping[handlers_.HandlerId, states.HandlerOutcome],
) -> None:
super().__init__(msg)
self.outcomes = outcomes
Expand Down Expand Up @@ -77,7 +79,7 @@ async def authenticate(
activity_results = await run_activity(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.AUTHENTICATION,
activity=handlers_.Activity.AUTHENTICATION,
)

if activity_results:
Expand All @@ -94,18 +96,28 @@ async def run_activity(
*,
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
activity: causation.Activity,
) -> Mapping[handlers.HandlerId, callbacks.Result]:
activity: handlers_.Activity,
) -> Mapping[handlers_.HandlerId, callbacks.Result]:
logger = logging.getLogger(f'kopf.activities.{activity.value}')

# For the activity handlers, we have neither bodies, nor patches, just the state.
cause = causation.ActivityCause(logger=logger, activity=activity)
handlers = registry.activity_handlers.get_handlers(activity=activity)
outcomes = await handling.run_handlers_until_done(
cause=cause,
handlers=handlers,
lifecycle=lifecycle,
)
state = states.State.from_scratch(handlers=handlers)
outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {}
while not state.done:
current_outcomes = await handling.execute_handlers_once(
lifecycle=lifecycle,
handlers=handlers,
cause=cause,
state=state,
)
outcomes.update(current_outcomes)
state = state.with_outcomes(current_outcomes)
delay = state.delay
if delay:
limited_delay = min(delay, handling.WAITING_KEEPALIVE_INTERVAL)
await sleeping.sleep_or_wait(limited_delay, asyncio.Event())

# Activities assume that all handlers must eventually succeed.
# We raise from the 1st exception only: just to have something real in the tracebacks.
Expand Down
Loading