Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #332 from nolar/classes-moved
Browse files Browse the repository at this point in the history
Move handler structures to kopf.structs package
  • Loading branch information
nolar authored Mar 26, 2020
2 parents 77bb639 + 938bf03 commit aa80d5a
Show file tree
Hide file tree
Showing 43 changed files with 216 additions and 252 deletions.
6 changes: 3 additions & 3 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
from kopf.reactor import (
lifecycles, # as a separate name on the public namespace
)
from kopf.reactor.errors import (
ErrorsMode,
)
from kopf.reactor.handling import (
TemporaryError,
PermanentError,
Expand Down Expand Up @@ -73,6 +70,9 @@
ABSENT,
PRESENT,
)
from kopf.structs.handlers import (
ErrorsMode,
)
from kopf.toolkits.hierarchies import (
adopt,
label,
Expand Down
5 changes: 2 additions & 3 deletions kopf/engines/probing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import aiohttp.web

from kopf.reactor import activities
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import lifecycles
from kopf.reactor import registries
from kopf.structs import callbacks
from kopf.structs import handlers

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,7 +54,7 @@ async def get_health(
activity_results = await activities.run_activity(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.PROBE,
activity=handlers.Activity.PROBE,
)
probing_container.clear()
probing_container.update(activity_results)
Expand Down
40 changes: 19 additions & 21 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ def creation_handler(**kwargs):

from typing import Optional, Callable

from kopf.reactor import causation
from kopf.reactor import errors as errors_
from kopf.reactor import handlers
from kopf.reactor import handling
from kopf.reactor import registries
from kopf.structs import callbacks
from kopf.structs import dicts
from kopf.structs import filters
from kopf.structs import handlers
from kopf.structs import resources

ActivityDecorator = Callable[[callbacks.ActivityFn], callbacks.ActivityFn]
Expand All @@ -33,7 +31,7 @@ def creation_handler(**kwargs):
def startup( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -46,7 +44,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.STARTUP,
activity=handlers.Activity.STARTUP,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -56,7 +54,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
def cleanup( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -69,7 +67,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.CLEANUP,
activity=handlers.Activity.CLEANUP,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -79,7 +77,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
def login( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -93,7 +91,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.AUTHENTICATION,
activity=handlers.Activity.AUTHENTICATION,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -103,7 +101,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
def probe( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -117,7 +115,7 @@ def decorator(fn: callbacks.ActivityFn) -> callbacks.ActivityFn:
handler = handlers.ActivityHandler(
fn=fn, id=real_id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.PROBE,
activity=handlers.Activity.PROBE,
)
real_registry.activity_handlers.append(handler)
return fn
Expand All @@ -128,7 +126,7 @@ def resume( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -161,7 +159,7 @@ def create( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -182,7 +180,7 @@ def decorator(fn: callbacks.ResourceChangingFn) -> callbacks.ResourceChangingFn:
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
labels=labels, annotations=annotations, when=when,
initial=None, deleted=None, requires_finalizer=None,
reason=causation.Reason.CREATE,
reason=handlers.Reason.CREATE,
)
real_registry.resource_changing_handlers[real_resource].append(handler)
return fn
Expand All @@ -193,7 +191,7 @@ def update( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -214,7 +212,7 @@ def decorator(fn: callbacks.ResourceChangingFn) -> callbacks.ResourceChangingFn:
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
labels=labels, annotations=annotations, when=when,
initial=None, deleted=None, requires_finalizer=None,
reason=causation.Reason.UPDATE,
reason=handlers.Reason.UPDATE,
)
real_registry.resource_changing_handlers[real_resource].append(handler)
return fn
Expand All @@ -225,7 +223,7 @@ def delete( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -247,7 +245,7 @@ def decorator(fn: callbacks.ResourceChangingFn) -> callbacks.ResourceChangingFn:
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
labels=labels, annotations=annotations, when=when,
initial=None, deleted=None, requires_finalizer=bool(not optional),
reason=causation.Reason.DELETE,
reason=handlers.Reason.DELETE,
)
real_registry.resource_changing_handlers[real_resource].append(handler)
return fn
Expand All @@ -259,7 +257,7 @@ def field( # lgtm[py/similar-function]
field: dicts.FieldSpec,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -318,7 +316,7 @@ def decorator(fn: callbacks.ResourceWatchingFn) -> callbacks.ResourceWatchingFn:
def this( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -379,7 +377,7 @@ def register( # lgtm[py/similar-function]
fn: callbacks.ResourceChangingFn,
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
errors: Optional[handlers.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down
34 changes: 23 additions & 11 deletions kopf/reactor/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
* Specific authentication methods, such as the authentication piggybacking,
belong to neither the reactor, nor the engines, nor the client wrappers.
"""
import asyncio
import logging
from typing import NoReturn, Mapping
from typing import NoReturn, Mapping, MutableMapping

from kopf.engines import sleeping
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import handling
from kopf.reactor import lifecycles
from kopf.reactor import registries
from kopf.reactor import states
from kopf.structs import callbacks
from kopf.structs import credentials
from kopf.structs import handlers as handlers_

logger = logging.getLogger(__name__)

Expand All @@ -38,7 +40,7 @@ def __init__(
self,
msg: str,
*,
outcomes: Mapping[handlers.HandlerId, states.HandlerOutcome],
outcomes: Mapping[handlers_.HandlerId, states.HandlerOutcome],
) -> None:
super().__init__(msg)
self.outcomes = outcomes
Expand Down Expand Up @@ -77,7 +79,7 @@ async def authenticate(
activity_results = await run_activity(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.AUTHENTICATION,
activity=handlers_.Activity.AUTHENTICATION,
)

if activity_results:
Expand All @@ -94,18 +96,28 @@ async def run_activity(
*,
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
activity: causation.Activity,
) -> Mapping[handlers.HandlerId, callbacks.Result]:
activity: handlers_.Activity,
) -> Mapping[handlers_.HandlerId, callbacks.Result]:
logger = logging.getLogger(f'kopf.activities.{activity.value}')

# For the activity handlers, we have neither bodies, nor patches, just the state.
cause = causation.ActivityCause(logger=logger, activity=activity)
handlers = registry.activity_handlers.get_handlers(activity=activity)
outcomes = await handling.run_handlers_until_done(
cause=cause,
handlers=handlers,
lifecycle=lifecycle,
)
state = states.State.from_scratch(handlers=handlers)
outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {}
while not state.done:
current_outcomes = await handling.execute_handlers_once(
lifecycle=lifecycle,
handlers=handlers,
cause=cause,
state=state,
)
outcomes.update(current_outcomes)
state = state.with_outcomes(current_outcomes)
delay = state.delay
if delay:
limited_delay = min(delay, handling.WAITING_KEEPALIVE_INTERVAL)
await sleeping.sleep_or_wait(limited_delay, asyncio.Event())

# Activities assume that all handlers must eventually succeed.
# We raise from the 1st exception only: just to have something real in the tracebacks.
Expand Down
Loading

0 comments on commit aa80d5a

Please sign in to comment.