Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #38 from psycho-ir/33-auto-peering-mode
Browse files Browse the repository at this point in the history
[GH-33] implement auto detection mode for peering.
  • Loading branch information
nolar authored Apr 24, 2019
2 parents f8cbadf + bd4a4fc commit fa4d772
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 14 deletions.
5 changes: 3 additions & 2 deletions docs/peering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ The operator can be instructed to use alternative peering objects::

The operators from different peering objects do not see each other.

The default peering name (i.e. if no peering or standalone options are provided)
is ``default``.
Default behavior
----------------

If there is a peering object with name `default` then it's been used by default as the peering object. Otherwise kopf will run the operator in mode `Standalone`.

Standalone mode
---------------
Expand Down
2 changes: 1 addition & 1 deletion kopf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def main():
@click.option('-n', '--namespace', default=None)
@click.option('--standalone', is_flag=True, default=False)
@click.option('--dev', 'priority', flag_value=666)
@click.option('-P', '--peering', type=str, default=PEERING_DEFAULT_NAME)
@click.option('-P', '--peering', type=str, default=None)
@click.option('-p', '--priority', type=int, default=0)
@click.option('-m', '--module', 'modules', multiple=True)
@click.argument('paths', nargs=-1)
Expand Down
43 changes: 41 additions & 2 deletions kopf/reactor/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import iso8601
import kubernetes
from kubernetes.client.rest import ApiException

from kopf.reactor.registry import Resource

Expand All @@ -65,7 +66,7 @@ def __init__(self,
self.id = id
self.peering = peering
self.namespace = namespace
self.priority = (priority)
self.priority = priority
self.lifetime = (lifetime if isinstance(lifetime, datetime.timedelta) else
datetime.timedelta(seconds=int(lifetime)))
self.lastseen = (lastseen if isinstance(lastseen, datetime.datetime) else
Expand All @@ -78,6 +79,26 @@ def __init__(self,
def __repr__(self):
return f"{self.__class__.__name__}({self.id}, namespace={self.namespace}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})"

@classmethod
def detect(cls,
standalone: bool,
peering: Optional[str],
**kwargs) -> Optional:
if standalone:
return None

if peering:
if Peer._is_peering_exist(peering):
return cls(peering=peering, **kwargs)
else:
raise Exception(f"The peering {peering} was not found")

if Peer._is_default_peering_setup():
return cls(peering=PEERING_DEFAULT_NAME, **kwargs)

logger.warning(f"Default peering object not found, falling back to the Standalone mode.")
return None

def as_dict(self):
# Only the non-calculated and non-identifying fields.
return {
Expand Down Expand Up @@ -109,6 +130,24 @@ def disappear(self):
self.touch(lifetime=0)
apply_peers([self], peering=self.peering)

@staticmethod
def _is_default_peering_setup():
return Peer._is_peering_exist(PEERING_DEFAULT_NAME)

@staticmethod
def _is_peering_exist(peering: str):
api = kubernetes.client.CustomObjectsApi()
try:
api.get_cluster_custom_object(group=PEERING_CRD_RESOURCE.group,
version=PEERING_CRD_RESOURCE.version,
plural=PEERING_CRD_RESOURCE.plural,
name=peering)
return True
except ApiException as e:
if e.status == 404:
return False
raise


def apply_peers(
peers: Iterable[Peer],
Expand Down Expand Up @@ -190,7 +229,7 @@ async def peers_keepalive(

# How often do we update. Keep limited to avoid k8s api flooding.
# Should be slightly less than the lifetime, enough for a patch request to finish.
await asyncio.sleep(max(1, ourselves.lifetime.total_seconds()-10))
await asyncio.sleep(max(1, int(ourselves.lifetime.total_seconds() - 10)))
finally:
try:
ourselves.disappear()
Expand Down
13 changes: 4 additions & 9 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

from kopf.reactor.handling import custom_object_handler
from kopf.reactor.lifecycles import get_default_lifecycle
from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id
from kopf.reactor.peering import PEERING_CRD_RESOURCE, PEERING_DEFAULT_NAME
from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id
from kopf.reactor.registry import get_default_registry, BaseRegistry, Resource
from kopf.reactor.watching import streaming_aiter

Expand Down Expand Up @@ -91,7 +91,7 @@ async def watcher(

# Ensure that the event is something we understand and can handle.
if event['type'] not in ['ADDED', 'MODIFIED', 'DELETED']:
logger.warn("Ignoring an unsupported event type: %r", event)
logger.warning("Ignoring an unsupported event type: %r", event)
continue

# Filter out all unrelated events as soon as possible (before queues), and silently.
Expand Down Expand Up @@ -183,13 +183,8 @@ def create_tasks(
tasks = []

# Monitor the peers, unless explicitly disabled.
if not standalone:
ourselves = Peer(
id=detect_own_id(),
priority=priority,
peering=peering,
namespace=namespace,
)
ourselves: Optional[Peer] = Peer.detect(standalone, peering, id=detect_own_id(), priority=priority, namespace=namespace)
if ourselves:
tasks.extend([
asyncio.Task(peers_keepalive(ourselves=ourselves)),
asyncio.Task(watcher(namespace=None, # peering is cluster-object
Expand Down

0 comments on commit fa4d772

Please sign in to comment.