Skip to content

Commit

Permalink
Move service and endpoints processing to a separate processor
Browse files Browse the repository at this point in the history
  • Loading branch information
impl committed Oct 16, 2020
1 parent c9313b7 commit 233264a
Show file tree
Hide file tree
Showing 9 changed files with 626 additions and 579 deletions.
498 changes: 21 additions & 477 deletions python/ambassador/fetch/fetcher.py

Large diffs are not rendered by default.

43 changes: 35 additions & 8 deletions python/ambassador/fetch/k8sobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import collections.abc
import dataclasses
import enum

from ..config import Config

Expand Down Expand Up @@ -45,6 +46,12 @@ def for_knative_networking(cls, kind: str) -> KubernetesGVK:
return cls('networking.internal.knative.dev/v1alpha1', kind)


@enum.unique
class KubernetesObjectScope (enum.Enum):
CLUSTER = enum.auto()
NAMESPACE = enum.auto()


@dataclasses.dataclass(frozen=True)
class KubernetesObjectKey:
"""
Expand All @@ -55,17 +62,26 @@ class KubernetesObjectKey:
namespace: Optional[str]
name: str

@property
def kind(self) -> str:
return self.gvk.kind

@property
def scope(self) -> KubernetesObjectScope:
return KubernetesObjectScope.CLUSTER if self.namespace is None else KubernetesObjectScope.NAMESPACE

class KubernetesObject(collections.abc.Mapping):
@classmethod
def from_object_reference(cls, ref: Dict[str, Any]) -> KubernetesObjectKey:
return cls(KubernetesGVK('v1', ref['kind']), ref.get('namespace'), ref['name'])


class KubernetesObject (collections.abc.Mapping):
"""
Represents a raw object from Kubernetes.
"""

default_namespace: Optional[str]

def __init__(self, delegate: Dict[str, Any], default_namespace: Optional[str] = None) -> None:
def __init__(self, delegate: Dict[str, Any]) -> None:
self.delegate = delegate
self.default_namespace = default_namespace

try:
self.gvk
Expand Down Expand Up @@ -95,10 +111,12 @@ def metadata(self) -> Dict[str, Any]:
return self['metadata']

@property
def namespace(self) -> Optional[str]:
val = self.metadata.get('namespace', self.default_namespace)
def namespace(self) -> str:
val = self.metadata.get('namespace')
if val == '_automatic_':
val = Config.ambassador_namespace
elif val is None:
raise AttributeError(f'{self.__class__.__name__} {self.gvk.domain} {self.name} is cluster-scoped and has no namespace')

return val

Expand All @@ -108,7 +126,16 @@ def name(self) -> str:

@property
def key(self) -> KubernetesObjectKey:
return KubernetesObjectKey(self.gvk, self.namespace, self.name)
try:
namespace = self.namespace
except AttributeError:
namespace = None

return KubernetesObjectKey(self.gvk, namespace, self.name)

@property
def scope(self) -> KubernetesObjectScope:
return self.key.scope

@property
def generation(self) -> int:
Expand Down
13 changes: 10 additions & 3 deletions python/ambassador/fetch/k8sprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ..config import Config

from .k8sobject import KubernetesGVK, KubernetesObjectKey, KubernetesObject
from .k8sobject import KubernetesGVK, KubernetesObjectScope, KubernetesObjectKey, KubernetesObject
from .resource import ResourceManager


Expand All @@ -30,6 +30,13 @@ def try_process(self, obj: KubernetesObject) -> bool:
if obj.gvk not in self.kinds():
return False

if obj.scope == KubernetesObjectScope.NAMESPACE:
if Config.single_namespace and obj.namespace != Config.ambassador_namespace:
# This should never happen in actual usage, since we shouldn't
# be given things in the wrong namespace. However, in
# development, this can happen a lot.
return False

self._process(obj)
return True

Expand Down Expand Up @@ -83,7 +90,7 @@ def _process(self, obj: KubernetesObject) -> None:
for proc in procs:
proc.try_process(obj)

def finalize(self):
def finalize(self) -> None:
for proc in self.delegates:
proc.finalize()

Expand Down Expand Up @@ -111,7 +118,7 @@ def _process(self, obj: KubernetesObject) -> None:
self.cache.add(obj.key)
self.delegate.try_process(obj)

def finalize(self):
def finalize(self) -> None:
self.delegate.finalize()


Expand Down
4 changes: 2 additions & 2 deletions python/ambassador/fetch/knative.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _emit_mapping(self, obj: KubernetesObject, rule_count: int, rule: Dict[str,
mapping = NormalizedResource.from_data(
'Mapping',
mapping_identifier,
namespace=obj.namespace or 'default',
namespace=obj.namespace,
generation=obj.generation,
labels=obj.labels,
spec=spec,
Expand Down Expand Up @@ -154,7 +154,7 @@ def _update_status(self, obj: KubernetesObject) -> None:
# the relevant domain by doing a DNS lookup on
# kubernetes.default.svc, but this problem appears elsewhere in the
# code as well and probably should just be fixed all at once.
current_lb_domain = f"{self.manager.ambassador_service.name}.{self.manager.ambassador_service.namespace or 'default'}.svc.cluster.local"
current_lb_domain = f"{self.manager.ambassador_service.name}.{self.manager.ambassador_service.namespace}.svc.cluster.local"

observed_ingress: Dict[str, Any] = next(iter(obj.status.get('privateLoadBalancer', {}).get('ingress', [])), {})
observed_lb_domain = observed_ingress.get('domainInternal')
Expand Down
27 changes: 19 additions & 8 deletions python/ambassador/fetch/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@ class Location:
filename: Optional[str] = None
ocount: int = 1

def mark_annotation(self) -> None:
if self.filename is None:
return
elif self.filename.endswith(':annotation'):
return

self.filename += ':annotation'
def filename_default(self, default: str = 'anonymous YAML') -> str:
return self.filename or default

def __str__(self) -> str:
return f"{self.filename or 'anonymous YAML'}.{self.ocount}"
return f"{self.filename_default()}.{self.ocount}"


class LocationManager:
Expand Down Expand Up @@ -63,3 +58,19 @@ def pop(self) -> Location:
current = self.current
self.current = self.previous.pop()
return current

def mark_annotated(self) -> ContextManager[Location]:
"""
Keeps the current stack, adding an annotation flag to the end of the
filename.
"""
previous_filename = self.current.filename
if self.current.filename and not self.current.filename.endswith(':annotation'):
self.current.filename += ':annotation'

@contextlib.contextmanager
def cleaner():
yield previous_filename
self.current.filename = previous_filename

return cleaner()
88 changes: 53 additions & 35 deletions python/ambassador/fetch/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import logging

from ..config import ACResource, Config
from ..utils import dump_yaml
from ..utils import dump_yaml, parse_yaml

from .k8sobject import KubernetesObject
from .k8sobject import KubernetesObjectScope, KubernetesObject
from .location import LocationManager


Expand All @@ -22,20 +22,28 @@ class NormalizedResource:
rkey: Optional[str] = None

@classmethod
def from_data(cls, kind: str, name: str, namespace: str = 'default', generation: int = 1,
version: str = 'v2', labels: Dict[str, Any] = None, spec: Dict[str, Any] = None,
errors: Optional[str]=None) -> NormalizedResource:
rkey = f'{name}.{namespace}'
def from_data(cls, kind: str, name: str, namespace: Optional[str] = None,
generation: Optional[int] = None, version: str = 'v2',
labels: Optional[Dict[str, Any]] = None,
spec: Dict[str, Any] = None, errors: Optional[str] = None,
rkey: Optional[str] = None) -> NormalizedResource:
if rkey is None:
rkey = f'{name}.{namespace}'

ir_obj = {}
if spec:
ir_obj.update(spec)

ir_obj['apiVersion'] = f'getambassador.io/{version}'
ir_obj['name'] = name
ir_obj['namespace'] = namespace
ir_obj['kind'] = kind
ir_obj['generation'] = generation
ir_obj['name'] = name

if namespace is not None:
ir_obj['namespace'] = namespace

if generation is not None:
ir_obj['generation'] = generation

ir_obj['metadata_labels'] = labels or {}

if errors:
Expand Down Expand Up @@ -68,6 +76,25 @@ def from_kubernetes_object(cls, obj: KubernetesObject) -> NormalizedResource:
spec=obj.spec,
)

@classmethod
def from_kubernetes_object_annotation(cls, obj: KubernetesObject) -> List[NormalizedResource]:
config = obj.annotations.get('getambassador.io/config')
if not config:
return []

def clean_normalize(r: Dict[str, Any]) -> NormalizedResource:
# Annotations should have to pass manual object validation.
r['_force_validation'] = True

if r.get('metadata_labels') is None and obj.labels:
r['metadata_labels'] = obj.labels
if r.get('namespace') is None and obj.scope == KubernetesObjectScope.NAMESPACE:
r['namespace'] = obj.namespace

return NormalizedResource(r, rkey=f'{obj.name}.{obj.namespace}')

return [clean_normalize(r) for r in parse_yaml(config) if r]


class ResourceManager:
"""
Expand All @@ -79,15 +106,13 @@ class ResourceManager:
locations: LocationManager
ambassador_service: Optional[KubernetesObject]
elements: List[ACResource]
services: Dict[str, Dict[str, Any]]

def __init__(self, logger: logging.Logger, aconf: Config):
self.logger = logger
self.aconf = aconf
self.locations = LocationManager()
self.ambassador_service = None
self.elements = []
self.services = {}

@property
def location(self) -> str:
Expand Down Expand Up @@ -116,8 +141,6 @@ def _emit(self, resource: NormalizedResource) -> bool:
(self.location, json.dumps(obj, indent=4, sort_keys=True)))
return True

# self.logger.debug("%s PROCESS %s initial rkey %s" % (self.location, obj['kind'], rkey))

# Is this a pragma object?
if obj['kind'] == 'Pragma':
# Why did I think this was a good idea? [ :) ]
Expand All @@ -132,35 +155,30 @@ def _emit(self, resource: NormalizedResource) -> bool:
return False

if not rkey:
rkey = self.locations.current.filename
rkey = self.locations.current.filename_default('unknown')

rkey = "%s.%d" % (rkey, self.locations.current.ocount)
if obj['kind'] != 'Service':
# Services are unique and don't get an object count appended to
# them.
rkey = "%s.%d" % (rkey, self.locations.current.ocount)

# self.logger.debug("%s PROCESS %s updated rkey to %s" % (self.location, obj['kind'], rkey))
serialization = dump_yaml(obj, default_flow_style=False)

# Force the namespace and metadata_labels, if need be.
# TODO(impl): Remove this?
# if namespace and not obj.get('namespace', None):
# obj['namespace'] = namespace
try:
r = ACResource.from_dict(rkey, rkey, serialization, obj)
self.elements.append(r)
except Exception as e:
self.aconf.post_error(e.args[0])

# Brutal hackery.
if obj['kind'] == 'Service':
self.logger.debug("%s PROCESS saving service %s" % (self.location, obj['name']))
self.services[obj['name']] = obj
else:
# Fine. Fine fine fine.
serialization = dump_yaml(obj, default_flow_style=False)

try:
r = ACResource.from_dict(rkey, rkey, serialization, obj)
self.elements.append(r)
except Exception as e:
self.aconf.post_error(e.args[0])

self.logger.debug("%s PROCESS %s save %s: %s" % (self.location, obj['kind'], rkey, serialization))
self.logger.debug("%s PROCESS %s save %s: %s" % (self.location, obj['kind'], rkey, serialization))

return True

def emit(self, resource: NormalizedResource):
if self._emit(resource):
self.locations.current.ocount += 1

def emit_annotated(self, resources: List[NormalizedResource]):
with self.locations.mark_annotated():
for resource in resources:
self.emit(resource)
Loading

0 comments on commit 233264a

Please sign in to comment.