diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a6992d5b2..c4699abec4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,8 +75,11 @@ Please see the [Envoy documentation](https://www.envoyproxy.io/docs/envoy/latest - Bugfix: Make sure that `labels` specifying headers with extra attributes are correctly supported again ([#3137]) - Bugfix: Support Consul services when the `ConsulResolver` and the `Mapping` aren't in the same namespace, and legacy mode is not enabled. - Feature: Ambassador now reads the ENVOY_CONCURRENCY environment variable to optionally set the [--concurrency](https://www.envoyproxy.io/docs/envoy/latest/operations/cli#cmdoption-concurrency) command line option when launching Envoy. This controls the number of worker threads used to serve requests and can be used to fine-tune system resource usage. +- Bugfix: Fix failure to start when one or more IngressClasses are present in a cluster ([#3142]). +- Bugfix: Prevent potential reconcile loop when updating the status of an Ingress. [#3137]: https://github.com/datawire/ambassador/issues/3137 +[#3142]: https://github.com/datawire/ambassador/issues/3142 ## [1.10.0] January 04, 2021 [1.10.0]: https://github.com/datawire/ambassador/compare/v1.9.1...v1.10.0 diff --git a/builder/sync-excludes.txt b/builder/sync-excludes.txt index 24d8c2398d..ac480d5152 100644 --- a/builder/sync-excludes.txt +++ b/builder/sync-excludes.txt @@ -11,6 +11,7 @@ __pycache__ .pytest_cache .mypy_cache .dmypy.json +.coverage go.sum bin/ diff --git a/python/ambassador/config/config.py b/python/ambassador/config/config.py index c7d38ce37b..3dc93d7c99 100644 --- a/python/ambassador/config/config.py +++ b/python/ambassador/config/config.py @@ -148,8 +148,6 @@ def __init__(self, schema_dir_path: Optional[str]=None) -> None: self.logger.debug("SCHEMA DIR %s" % os.path.abspath(self.schema_dir_path)) self.k8s_status_updates: Dict[str, Tuple[str, str, Optional[Dict[str, Any]]]] = {} # Tuple is (name, namespace, status_json) - self.k8s_ingresses: Dict[str, Any] = {} - self.k8s_ingress_classes: Dict[str, Any] = {} self.pod_labels: Dict[str, str] = {} self._reset() diff --git a/python/ambassador/fetch/dependency.py b/python/ambassador/fetch/dependency.py new file mode 100644 index 0000000000..7123006b9a --- /dev/null +++ b/python/ambassador/fetch/dependency.py @@ -0,0 +1,183 @@ +from typing import Any, Collection, Iterator, Mapping, MutableSet, Optional, Protocol, Sequence, Type, TypeVar + +from collections import defaultdict +import dataclasses + +from .k8sobject import KubernetesObject + + +class Dependency (Protocol): + """ + Dependencies link information provided by processors of a given Watt + invocation to other processors that need the processed result. This results + in an ordering of keys so that processors can be dependent on each other + without direct knowledge of where data is coming from. + """ + + def watt_key(self) -> str: ... + + +class ServiceDependency (Dependency): + """ + A dependency that exposes information about the Kubernetes service for + Ambassador itself. + """ + + ambassador_service: Optional[KubernetesObject] + + def __init__(self) -> None: + self.ambassador_service = None + + def watt_key(self) -> str: + return 'service' + + +class SecretDependency (Dependency): + """ + A dependency that is satisfied once secret information has been mapped and + emitted. + """ + + def watt_key(self) -> str: + return 'secret' + + +class IngressClassesDependency (Dependency): + """ + A dependency that provides the list of ingress classes that are valid (i.e., + have the proper controller) for this cluster. + """ + + ingress_classes: MutableSet[str] + + def __init__(self): + self.ingress_classes = set() + + def watt_key(self) -> str: + return 'ingressclasses' + + +D = TypeVar('D', bound=Dependency) + + +class DependencyMapping (Protocol): + + def __contains__(self, key: Type[D]) -> bool: ... + def __getitem__(self, key: Type[D]) -> D: ... + + +class DependencyInjector: + """ + Each processor instance is provided with a dependency injector that allows + it to declare what dependencies it provides as part of its processing and + what dependencies it wants to do its processing. + + Note that dependencies need not be fulfilled; for example, nothing may + provide information about the Ambassador service or the list of valid + ingress classes. Processors should be prepared to deal with the absence of + valid data when they run. + """ + + wants: MutableSet[Type[Dependency]] + provides: MutableSet[Type[Dependency]] + deps: DependencyMapping + + def __init__(self, deps: DependencyMapping) -> None: + self.wants = set() + self.provides = set() + self.deps = deps + + def want(self, cls: Type[D]) -> D: + self.wants.add(cls) + return self.deps[cls] + + def provide(self, cls: Type[D]) -> D: + self.provides.add(cls) + return self.deps[cls] + + +class DependencyGraph: + """ + Once dependency relationships are known, this class provides the ability to + link them holistically and traverse them in topological order. It is most + useful in the context of the sorted_watt_keys() method of the + DependencyManager. + """ + + @dataclasses.dataclass + class Vertex: + out: MutableSet[Any] + in_count: int + + vertices: Mapping[Any, Vertex] + + def __init__(self) -> None: + self.vertices = defaultdict(lambda: DependencyGraph.Vertex(out=set(), in_count=0)) + + def connect(self, a: Any, b: Any) -> None: + if b not in self.vertices[a].out: + self.vertices[a].out.add(b) + self.vertices[b].in_count += 1 + + def traverse(self) -> Iterator[Any]: + """ + Returns the items in this graph in topological order. + """ + + if len(self.vertices) == 0: + return + + # This method implements Kahn's algorithm. See + # https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm for + # more information. + + # Create a copy of the counts of each inbound edge so we can mutate + # them. + in_counts = {obj: vertex.in_count for obj, vertex in self.vertices.items()} + + # Find the roots of the graph. + queue = [obj for obj, in_count in in_counts.items() if in_count == 0] + + # No roots of a graph with at least one vertex indicates a cycle. + if len(queue) == 0: + raise ValueError('cyclic') + + while len(queue) > 0: + cur = queue.pop(0) + yield cur + + for obj in self.vertices[cur].out: + in_counts[obj] -= 1 + if in_counts[obj] == 0: + queue.append(obj) + + assert sum(in_counts.values()) == 0, 'Traversal did not reach every vertex exactly once' + + +class DependencyManager: + """ + A manager that provides access to a set of dependencies for arbitrary object + instances and the ability to compute a sorted list of Watt keys that + represent the processing order for the dependencies. + """ + + deps: DependencyMapping + injectors: Mapping[Any, DependencyInjector] + + def __init__(self, deps: Collection[D]) -> None: + self.deps = {dep.__class__: dep for dep in deps} + self.injectors = defaultdict(lambda: DependencyInjector(self.deps)) + + def for_instance(self, obj: Any) -> DependencyInjector: + return self.injectors[obj] + + def sorted_watt_keys(self) -> Sequence[Dependency]: + g = DependencyGraph() + + for obj, injector in self.injectors.items(): + for cls in injector.provides: + g.connect(obj, cls) + for cls in injector.wants: + g.connect(cls, obj) + + return [self.deps[obj].watt_key() for obj in g.traverse() if obj in self.deps] diff --git a/python/ambassador/fetch/fetcher.py b/python/ambassador/fetch/fetcher.py index 866fd1a8b2..9a9fb23efe 100644 --- a/python/ambassador/fetch/fetcher.py +++ b/python/ambassador/fetch/fetcher.py @@ -9,6 +9,7 @@ from ..config import ACResource, Config from ..utils import parse_yaml, parse_json, dump_json +from .dependency import DependencyManager, IngressClassesDependency, SecretDependency, ServiceDependency from .resource import NormalizedResource, ResourceManager from .k8sobject import KubernetesGVK, KubernetesObject from .k8sprocessor import ( @@ -19,11 +20,11 @@ ) from .ambassador import AmbassadorProcessor from .secret import SecretProcessor +from .ingress import IngressClassProcessor, IngressProcessor from .service import ServiceProcessor from .knative import KnativeIngressProcessor AnyDict = Dict[str, Any] -HandlerResult = Optional[Tuple[str, List[AnyDict]]] # XXX ALL OF THE BELOW COMMENT IS PROBABLY OUT OF DATE. (Flynn, 2019-10-29) # @@ -61,21 +62,24 @@ def __init__(self, logger: logging.Logger, aconf: 'Config', skip_init_dir: bool=False, watch_only=False) -> None: self.aconf = aconf self.logger = logger - self.manager = ResourceManager(self.logger, self.aconf) + self.manager = ResourceManager(self.logger, self.aconf, DependencyManager([ + ServiceDependency(), + SecretDependency(), + IngressClassesDependency(), + ])) self.k8s_processor = DeduplicatingKubernetesProcessor(AggregateKubernetesProcessor([ CountingKubernetesProcessor(self.aconf, KubernetesGVK.for_knative_networking('Ingress'), 'knative_ingress'), AmbassadorProcessor(self.manager), SecretProcessor(self.manager), + IngressClassProcessor(self.manager), + IngressProcessor(self.manager), ServiceProcessor(self.manager, watch_only=watch_only), KnativeIngressProcessor(self.manager), ])) self.alerted_about_labels = False - # For deduplicating objects coming in from watt - self.k8s_parsed: Dict[str, bool] = {} - # Deltas, for managing the cache. self.deltas: List[Dict[str, Union[str, Dict[str, str]]]] = [] @@ -152,38 +156,26 @@ def load_from_filesystem(self, config_dir_path, recurse: bool=False, if finalize: self.finalize() - def parse_yaml(self, serialization: str, k8s=False, rkey: Optional[str]=None, - filename: Optional[str]=None, finalize: bool=True, metadata_labels: Optional[Dict[str, str]]=None) -> None: + def parse_yaml(self, serialization: str, k8s=False, rkey: Optional[str] = None, + filename: Optional[str] = None, finalize: bool = True) -> None: # self.logger.info(f"RF YAML: {serialization}") # Expand environment variables allowing interpolation in manifests. serialization = os.path.expandvars(serialization) - try: - # UGH. This parse_yaml is the one we imported from utils. XXX This needs to be fixed. - objects = parse_yaml(serialization) - self.parse_object(objects=objects, k8s=k8s, rkey=rkey, filename=filename) - except yaml.error.YAMLError as e: - self.aconf.post_error("%s: could not parse YAML: %s" % (self.location, e)) - - if finalize: - self.finalize() - - def parse_json(self, serialization: str, k8s=False, rkey: Optional[str]=None, - filename: Optional[str]=None, finalize: bool=True) -> None: - # self.logger.debug("%s: parsing %d byte%s of YAML:\n%s" % - # (self.location, len(serialization), "" if (len(serialization) == 1) else "s", - # serialization)) - - # Expand environment variables allowing interpolation in manifests. - serialization = os.path.expandvars(serialization) + if not filename: + filename = self.manager.locations.current.filename - try: - # This parse_json is the one we imported from utils. XXX This (also?) needs to be fixed. - objects = parse_json(serialization) - self.parse_object(objects=objects, k8s=k8s, rkey=rkey, filename=filename) - except json.decoder.JSONDecodeError as e: - self.aconf.post_error("%s: could not parse YAML: %s" % (self.location, e)) + with self.manager.locations.push(filename=filename): + try: + # UGH. This parse_yaml is the one we imported from utils. XXX This needs to be fixed. + for obj in parse_yaml(serialization): + if k8s: + self.handle_k8s(obj) + else: + self.manager.emit(NormalizedResource(obj, rkey=rkey)) + except yaml.error.YAMLError as e: + self.aconf.post_error("%s: could not parse YAML: %s" % (self.location, e)) if finalize: self.finalize() @@ -247,7 +239,7 @@ def parse_watt(self, serialization: str, finalize: bool=True) -> None: # Can't work with this at _all_. self.logger.error(f"skipping invalid object with no kind: {obj}") continue - + # We can't use watt_k8s.setdefault() here because many keys have # explicit null values -- they'll need to be turned into empty lists # and re-saved, and setdefault() won't do that for an explicit null. @@ -261,7 +253,7 @@ def parse_watt(self, serialization: str, finalize: bool=True) -> None: # These objects have to be processed first, in order, as they depend # on each other. - watt_k8s_keys = ['service', 'endpoints', 'secret', 'ingressclasses', 'ingresses'] + watt_k8s_keys = self.manager.deps.sorted_watt_keys() # Then we add everything else to be processed. watt_k8s_keys += watt_k8s.keys() @@ -278,12 +270,7 @@ def parse_watt(self, serialization: str, finalize: bool=True) -> None: consul_endpoints = watt_consul.get('Endpoints', {}) for consul_rkey, consul_object in consul_endpoints.items(): - result = self.handle_consul_service(consul_rkey, consul_object) - - if result: - rkey, parsed_objects = result - - self.parse_object(parsed_objects, k8s=False, rkey=rkey) + self.handle_consul_service(consul_rkey, consul_object) except json.decoder.JSONDecodeError as e: self.aconf.post_error("%s: could not parse WATT: %s" % (self.location, e)) @@ -311,16 +298,8 @@ def load_pod_labels(self): self.aconf.pod_labels[pod_label_kv[0][0]] = pod_label_kv[0][1] self.logger.debug(f"Parsed pod labels: {self.aconf.pod_labels}") - def check_k8s_dup(self, kind: str, namespace: Optional[str], name: str) -> bool: - key = f"{kind}/{name}.{namespace}" - - if key in self.k8s_parsed: - # self.logger.debug(f"dropping K8s dup {key}") - return False - - # self.logger.info(f"remembering K8s {key}") - self.k8s_parsed[key] = True - return True + def sorted(self, key=lambda x: x.rkey): # returns an iterator, probably + return sorted(self.elements, key=key) def handle_k8s(self, raw_obj: dict) -> None: # self.logger.debug("handle_k8s obj %s" % dump_json(obj, pretty=True)) @@ -333,335 +312,12 @@ def handle_k8s(self, raw_obj: dict) -> None: return with self.manager.locations.push_reset(): - if self.k8s_processor.try_process(obj): - # Nothing else to do. - return - - handler_name = f'handle_k8s_{obj.kind.lower()}' - # self.logger.debug(f"looking for handler {handler_name} for K8s {kind} {name}") - handler = getattr(self, handler_name, None) - - if not handler: - self.logger.debug(f"{self.location}: skipping K8s {obj.gvk}") - return - - if not self.check_k8s_dup(obj.kind, obj.namespace, obj.name): - return - - result = handler(raw_obj) - - if result: - rkey, parsed_objects = result - - self.parse_object(parsed_objects, k8s=False, rkey=rkey) - - def parse_object(self, objects, k8s=False, rkey: Optional[str] = None, filename: Optional[str] = None): - if not filename: - filename = self.manager.locations.current.filename - - self.manager.locations.push(filename=filename) - - # self.logger.debug("PARSE_OBJECT: incoming %d" % len(objects)) - - for obj in objects: - # self.logger.debug("PARSE_OBJECT: checking %s" % obj) - - if k8s: - self.handle_k8s(obj) - else: - # if not obj: - # self.logger.debug("%s: empty object from %s" % (self.location, serialization)) - - self.manager.emit(NormalizedResource(obj, rkey=rkey)) - - self.manager.locations.pop() - - def sorted(self, key=lambda x: x.rkey): # returns an iterator, probably - return sorted(self.elements, key=key) - - def handle_k8s_ingressclass(self, k8s_object: AnyDict) -> HandlerResult: - metadata = k8s_object.get('metadata', None) - ingress_class_name = metadata.get('name') if metadata else None - ingress_class_spec = k8s_object.get('spec', None) - - # Important: IngressClass is not namespaced! - resource_identifier = f'{ingress_class_name}' - - skip = False - if not metadata: - self.logger.debug('ignoring K8s IngressClass with no metadata') - skip = True - if not ingress_class_name: - self.logger.debug('ignoring K8s IngressClass with no name') - skip = True - if not ingress_class_spec: - self.logger.debug('ignoring K8s IngressClass with no spec') - skip = True - - # We only want to deal with IngressClasses that belong to "spec.controller: getambassador.io/ingress-controller" - if ingress_class_spec.get('controller', '').lower() != 'getambassador.io/ingress-controller': - self.logger.debug(f'ignoring IngressClass {ingress_class_name} without controller - getambassador.io/ingress-controller') - skip = True - - if skip: - return None - - annotations = metadata.get('annotations', {}) - ambassador_id = annotations.get('getambassador.io/ambassador-id', 'default') - - # We don't want to deal with non-matching Ambassador IDs - if ambassador_id != Config.ambassador_id: - self.logger.debug(f'IngressClass {ingress_class_name} does not have Ambassador ID {Config.ambassador_id}, ignoring...') - return None - - # TODO: Do we intend to use this parameter in any way? - # `parameters` is of type TypedLocalObjectReference, - # meaning it links to another k8s resource in the same namespace. - # https://godoc.org/k8s.io/api/core/v1#TypedLocalObjectReference - # - # In this case, the resource referenced by TypedLocalObjectReference - # should not be namespaced, as IngressClass is a non-namespaced resource. - # - # It was designed to reference a CRD for this specific ingress-controller - # implementation... although usage is optional and not prescribed. - ingress_parameters = ingress_class_spec.get('parameters', {}) - - self.logger.debug(f'Handling IngressClass {ingress_class_name} with parameters {ingress_parameters}...') - - # Don't return this as we won't handle IngressClass downstream. - # Instead, save it in self.aconf.k8s_ingress_classes for reference in handle_k8s_ingress - self.aconf.k8s_ingress_classes[resource_identifier] = ingress_parameters - - return None - - def handle_k8s_ingress(self, k8s_object: AnyDict) -> HandlerResult: - if 'metadata' not in k8s_object: - self.logger.debug("ignoring K8s Ingress with no metadata") - return None - - metadata = k8s_object['metadata'] - - if 'name' not in metadata: - self.logger.debug("ignoring K8s Ingress with no name") - return None - - ingress_name = metadata['name'] - - if 'spec' not in k8s_object: - self.logger.debug(f"ignoring K8s Ingress {ingress_name} with no spec") - return None - - ingress_spec = k8s_object['spec'] - ingress_namespace = metadata.get('namespace') or 'default' - - metadata_labels: Optional[Dict[str, str]] = metadata.get('labels') - - resource_identifier = f'{ingress_name}.{ingress_namespace}' - - # we don't need an ingress without ingress class set to ambassador - annotations = metadata.get('annotations', {}) - ingress_class_name = ingress_spec.get('ingressClassName', '') - - ingress_class = self.aconf.k8s_ingress_classes.get(ingress_class_name, None) - has_ambassador_ingress_class_annotation = annotations.get('kubernetes.io/ingress.class', '').lower() == 'ambassador' - - # check the Ingress resource has either: - # - a `kubernetes.io/ingress.class: "ambassador"` annotation - # - a `spec.ingressClassName` that references an IngressClass with - # `spec.controller: getambassador.io/ingress-controller` - # - # also worth noting, the kube-apiserver might assign the `spec.ingressClassName` if unspecified - # and only 1 IngressClass has the following annotation: - # annotations: - # ingressclass.kubernetes.io/is-default-class: "true" - if (not has_ambassador_ingress_class_annotation) and (ingress_class is None): - self.logger.debug(f'ignoring Ingress {ingress_name} without annotation (kubernetes.io/ingress.class: "ambassador") or IngressClass controller (getambassador.io/ingress-controller)') - return None - - # Let's see if our Ingress resource has Ambassador annotations on it - annotations = metadata.get('annotations', {}) - ambassador_annotations = annotations.get('getambassador.io/config', None) - - parsed_ambassador_annotations = None - if ambassador_annotations is not None: - self.manager.locations.mark_annotated() - - try: - parsed_ambassador_annotations = parse_yaml(ambassador_annotations) - except yaml.error.YAMLError as e: - self.logger.debug("could not parse YAML: %s" % e) - - ambassador_id = annotations.get('getambassador.io/ambassador-id', 'default') - - # We don't want to deal with non-matching Ambassador IDs - if ambassador_id != Config.ambassador_id: - self.logger.debug(f"Ingress {ingress_name} does not have Ambassador ID {Config.ambassador_id}, ignoring...") - return None - - self.logger.debug(f"Handling Ingress {ingress_name}...") - # We will translate the Ingress resource into Hosts and Mappings, - # but keep a reference to the k8s resource in aconf for debugging and stats - self.aconf.k8s_ingresses[resource_identifier] = k8s_object - - ingress_tls = ingress_spec.get('tls', []) - for tls_count, tls in enumerate(ingress_tls): - - tls_secret = tls.get('secretName', None) - if tls_secret is not None: - - for host_count, host in enumerate(tls.get('hosts', ['*'])): - tls_unique_identifier = f"{ingress_name}-{tls_count}-{host_count}" - - ingress_host: Dict[str, Any] = { - 'apiVersion': 'getambassador.io/v2', - 'kind': 'Host', - 'metadata': { - 'name': tls_unique_identifier, - 'namespace': ingress_namespace - }, - 'spec': { - 'ambassador_id': [ambassador_id], - 'hostname': host, - 'acmeProvider': { - 'authority': 'none' - }, - 'tlsSecret': { - 'name': tls_secret - }, - 'requestPolicy': { - 'insecure': { - 'action': 'Route' - } - } - } - } - - if metadata_labels: - ingress_host['metadata']['labels'] = metadata_labels - - self.logger.debug(f"Generated Host from ingress {ingress_name}: {ingress_host}") - self.handle_k8s(ingress_host) - - # parse ingress.spec.defaultBackend - # using ingress.spec.backend as a fallback, for older versions of the Ingress resource. - default_backend = ingress_spec.get('defaultBackend', ingress_spec.get('backend', {})) - db_service_name = default_backend.get('serviceName', None) - db_service_port = default_backend.get('servicePort', None) - if db_service_name is not None and db_service_port is not None: - db_mapping_identifier = f"{ingress_name}-default-backend" - - default_backend_mapping: AnyDict = { - 'apiVersion': 'getambassador.io/v2', - 'kind': 'Mapping', - 'metadata': { - 'name': db_mapping_identifier, - 'namespace': ingress_namespace - }, - 'spec': { - 'ambassador_id': ambassador_id, - 'prefix': '/', - 'service': f'{db_service_name}.{ingress_namespace}:{db_service_port}' - } - } - - if metadata_labels: - default_backend_mapping['metadata']['labels'] = metadata_labels - - self.logger.debug(f"Generated mapping from Ingress {ingress_name}: {default_backend_mapping}") - self.handle_k8s(default_backend_mapping) - - # parse ingress.spec.rules - ingress_rules = ingress_spec.get('rules', []) - for rule_count, rule in enumerate(ingress_rules): - rule_http = rule.get('http', {}) - - rule_host = rule.get('host', None) - - http_paths = rule_http.get('paths', []) - for path_count, path in enumerate(http_paths): - path_backend = path.get('backend', {}) - path_type = path.get('pathType', 'ImplementationSpecific') - - service_name = path_backend.get('serviceName', None) - service_port = path_backend.get('servicePort', None) - path_location = path.get('path', '/') - - if not service_name or not service_port or not path_location: - continue - - unique_suffix = f"{rule_count}-{path_count}" - mapping_identifier = f"{ingress_name}-{unique_suffix}" - - # For cases where `pathType: Exact`, - # otherwise `Prefix` and `ImplementationSpecific` are handled as regular Mapping prefixes - is_exact_prefix = True if path_type == 'Exact' else False - - path_mapping: Dict[str, Any] = { - 'apiVersion': 'getambassador.io/v2', - 'kind': 'Mapping', - 'metadata': { - 'name': mapping_identifier, - 'namespace': ingress_namespace - }, - 'spec': { - 'ambassador_id': ambassador_id, - 'prefix': path_location, - 'prefix_exact': is_exact_prefix, - 'precedence': 1 if is_exact_prefix else 0, # Make sure exact paths are evaluated before prefix - 'service': f'{service_name}.{ingress_namespace}:{service_port}' - } - } - - if metadata_labels: - path_mapping['metadata']['labels'] = metadata_labels - - if rule_host is not None: - if rule_host.startswith('*.'): - # Ingress allow specifying hosts with a single wildcard as the first label in the hostname. - # Transform the rule_host into a host_regex: - # *.star.com becomes ^[a-z0-9]([-a-z0-9]*[a-z0-9])?\.star\.com$ - path_mapping['spec']['host'] = rule_host\ - .replace('.', '\\.')\ - .replace('*', '^[a-z0-9]([-a-z0-9]*[a-z0-9])?', 1) + '$' - path_mapping['spec']['host_regex'] = True - else: - path_mapping['spec']['host'] = rule_host - - self.logger.debug(f"Generated mapping from Ingress {ingress_name}: {path_mapping}") - self.handle_k8s(path_mapping) - - # let's make arrangements to update Ingress' status now - if not self.manager.ambassador_service: - self.logger.error(f"Unable to update Ingress {ingress_name}'s status, could not find Ambassador service") - else: - ingress_status = self.manager.ambassador_service.status - - if ingress_status: - kind = k8s_object.get('kind') - assert(kind) - - ingress_status_update = (kind, ingress_namespace, ingress_status) - self.logger.debug(f"Updating Ingress {ingress_name} status to {ingress_status_update}") - self.aconf.k8s_status_updates[f'{ingress_name}.{ingress_namespace}'] = ingress_status_update - - if parsed_ambassador_annotations is not None: - # Copy metadata_labels to parsed annotations, if need be. - if metadata_labels: - for p in parsed_ambassador_annotations: - if p.get('metadata_labels') is None: - p['metadata_labels'] = metadata_labels - - # Force validation for all of these objects. - for p in parsed_ambassador_annotations: - p['_force_validation'] = True - - return resource_identifier, parsed_ambassador_annotations - - return None + if not self.k8s_processor.try_process(obj): + self.logger.debug(f"{self.location}: skipping K8s {obj.gvk}") # Handler for Consul services def handle_consul_service(self, - consul_rkey: str, consul_object: AnyDict) -> HandlerResult: + consul_rkey: str, consul_object: AnyDict) -> None: # resource_identifier = f'consul-{consul_rkey}' endpoints = consul_object.get('Endpoints', []) @@ -670,7 +326,7 @@ def handle_consul_service(self, if len(endpoints) < 1: # Bzzt. self.logger.debug(f"ignoring Consul service {name} with no Endpoints") - return None + return # We can turn this directly into an Ambassador Service resource, since Consul keeps # services and endpoints together (as it should!!). @@ -710,7 +366,5 @@ def handle_consul_service(self, rkey=f"consul-{name}-{spec['datacenter']}", )) - return None - def finalize(self) -> None: self.k8s_processor.finalize() diff --git a/python/ambassador/fetch/ingress.py b/python/ambassador/fetch/ingress.py new file mode 100644 index 0000000000..5a8fa140a0 --- /dev/null +++ b/python/ambassador/fetch/ingress.py @@ -0,0 +1,241 @@ +from typing import ClassVar, FrozenSet + +from ..config import Config + +from .dependency import IngressClassesDependency, SecretDependency, ServiceDependency +from .k8sobject import KubernetesGVK, KubernetesObject +from .k8sprocessor import ManagedKubernetesProcessor +from .resource import NormalizedResource, ResourceManager + + +class IngressClassProcessor (ManagedKubernetesProcessor): + + CONTROLLER: ClassVar[str] = 'getambassador.io/ingress-controller' + + ingress_classes_dep: IngressClassesDependency + + def __init__(self, manager: ResourceManager) -> None: + super().__init__(manager) + + self.ingress_classes_dep = self.deps.provide(IngressClassesDependency) + + def kinds(self) -> FrozenSet[KubernetesGVK]: + return frozenset([ + KubernetesGVK('networking.k8s.io/v1beta1', 'IngressClass'), + KubernetesGVK('networking.k8s.io/v1', 'IngressClass'), + ]) + + def _process(self, obj: KubernetesObject) -> None: + # We only want to deal with IngressClasses that belong to "spec.controller: getambassador.io/ingress-controller" + if obj.spec.get('controller', '').lower() != self.CONTROLLER: + self.logger.debug(f'ignoring IngressClass {obj.name} without controller - getambassador.io/ingress-controller') + return + + if obj.ambassador_id != Config.ambassador_id: + self.logger.debug(f'IngressClass {obj.name} does not have Ambassador ID {Config.ambassador_id}, ignoring...') + return + + # TODO: Do we intend to use this parameter in any way? + # `parameters` is of type TypedLocalObjectReference, + # meaning it links to another k8s resource in the same namespace. + # https://godoc.org/k8s.io/api/core/v1#TypedLocalObjectReference + # + # In this case, the resource referenced by TypedLocalObjectReference + # should not be namespaced, as IngressClass is a non-namespaced resource. + # + # It was designed to reference a CRD for this specific ingress-controller + # implementation... although usage is optional and not prescribed. + ingress_parameters = obj.spec.get('parameters', {}) + + self.logger.debug(f'Handling IngressClass {obj.name} with parameters {ingress_parameters}...') + self.aconf.incr_count('k8s_ingress_class') + + # Don't emit this directly. We use it when we handle ingresses below. If + # we want to use the parameters, we should add them to this dependency + # type. + self.ingress_classes_dep.ingress_classes.add(obj.name) + + +class IngressProcessor (ManagedKubernetesProcessor): + + service_dep: ServiceDependency + ingress_classes_dep: IngressClassesDependency + + def __init__(self, manager: ResourceManager) -> None: + super().__init__(manager) + + self.deps.want(SecretDependency) + self.service_dep = self.deps.want(ServiceDependency) + self.ingress_classes_dep = self.deps.want(IngressClassesDependency) + + def kinds(self) -> FrozenSet[KubernetesGVK]: + return frozenset([ + KubernetesGVK('extensions/v1beta1', 'Ingress'), + KubernetesGVK('networking.k8s.io/v1beta1', 'Ingress'), + KubernetesGVK('networking.k8s.io/v1', 'Ingress'), + ]) + + def _update_status(self, obj: KubernetesObject) -> None: + service_status = None + + if not self.service_dep.ambassador_service or not self.service_dep.ambassador_service.name: + self.logger.error(f"Unable to set Ingress {obj.name}'s load balancer, could not find Ambassador service") + else: + service_status = self.service_dep.ambassador_service.status + + if obj.status != service_status: + if service_status: + status_update = (obj.gvk.kind, obj.namespace, service_status) + self.logger.debug(f"Updating Ingress {obj.name} status to {status_update}") + self.aconf.k8s_status_updates[f'{obj.name}.{obj.namespace}'] = status_update + else: + self.logger.debug(f"Not reconciling Ingress {obj.name}: observed and current statuses are in sync") + + def _process(self, obj: KubernetesObject) -> None: + ingress_class_name = obj.spec.get('ingressClassName', '') + + has_ingress_class = ingress_class_name in self.ingress_classes_dep.ingress_classes + has_ambassador_ingress_class_annotation = obj.annotations.get('kubernetes.io/ingress.class', '').lower() == 'ambassador' + + # check the Ingress resource has either: + # - a `kubernetes.io/ingress.class: "ambassador"` annotation + # - a `spec.ingressClassName` that references an IngressClass with + # `spec.controller: getambassador.io/ingress-controller` + # + # also worth noting, the kube-apiserver might assign the `spec.ingressClassName` if unspecified + # and only 1 IngressClass has the following annotation: + # annotations: + # ingressclass.kubernetes.io/is-default-class: "true" + if not (has_ingress_class or has_ambassador_ingress_class_annotation): + self.logger.debug(f'ignoring Ingress {obj.name} without annotation (kubernetes.io/ingress.class: "ambassador") or IngressClass controller (getambassador.io/ingress-controller)') + return + + # We don't want to deal with non-matching Ambassador IDs + if obj.ambassador_id != Config.ambassador_id: + self.logger.debug(f"Ingress {obj.name} does not have Ambassador ID {Config.ambassador_id}, ignoring...") + return + + self.logger.debug(f"Handling Ingress {obj.name}...") + self.aconf.incr_count('k8s_ingress') + + ingress_tls = obj.spec.get('tls', []) + for tls_count, tls in enumerate(ingress_tls): + + tls_secret = tls.get('secretName', None) + if tls_secret is not None: + + for host_count, host in enumerate(tls.get('hosts', ['*'])): + tls_unique_identifier = f"{obj.name}-{tls_count}-{host_count}" + + spec = { + 'ambassador_id': [obj.ambassador_id], + 'hostname': host, + 'acmeProvider': { + 'authority': 'none' + }, + 'tlsSecret': { + 'name': tls_secret + }, + 'requestPolicy': { + 'insecure': { + 'action': 'Route' + } + } + } + + ingress_host = NormalizedResource.from_data( + 'Host', + tls_unique_identifier, + namespace=obj.namespace, + labels=obj.labels, + spec=spec, + ) + + self.logger.debug(f"Generated Host from ingress {obj.name}: {ingress_host}") + self.manager.emit(ingress_host) + + # parse ingress.spec.defaultBackend + # using ingress.spec.backend as a fallback, for older versions of the Ingress resource. + default_backend = obj.spec.get('defaultBackend', obj.spec.get('backend', {})) + db_service_name = default_backend.get('serviceName', None) + db_service_port = default_backend.get('servicePort', None) + if db_service_name is not None and db_service_port is not None: + db_mapping_identifier = f"{obj.name}-default-backend" + + default_backend_mapping = NormalizedResource.from_data( + 'Mapping', + db_mapping_identifier, + namespace=obj.namespace, + labels=obj.labels, + spec={ + 'ambassador_id': obj.ambassador_id, + 'prefix': '/', + 'service': f'{db_service_name}.{obj.namespace}:{db_service_port}' + }, + ) + + self.logger.debug(f"Generated mapping from Ingress {obj.name}: {default_backend_mapping}") + self.manager.emit(default_backend_mapping) + + # parse ingress.spec.rules + ingress_rules = obj.spec.get('rules', []) + for rule_count, rule in enumerate(ingress_rules): + rule_http = rule.get('http', {}) + + rule_host = rule.get('host', None) + + http_paths = rule_http.get('paths', []) + for path_count, path in enumerate(http_paths): + path_backend = path.get('backend', {}) + path_type = path.get('pathType', 'ImplementationSpecific') + + service_name = path_backend.get('serviceName', None) + service_port = path_backend.get('servicePort', None) + path_location = path.get('path', '/') + + if not service_name or not service_port or not path_location: + continue + + unique_suffix = f"{rule_count}-{path_count}" + mapping_identifier = f"{obj.name}-{unique_suffix}" + + # For cases where `pathType: Exact`, + # otherwise `Prefix` and `ImplementationSpecific` are handled as regular Mapping prefixes + is_exact_prefix = True if path_type == 'Exact' else False + + spec = { + 'ambassador_id': obj.ambassador_id, + 'prefix': path_location, + 'prefix_exact': is_exact_prefix, + 'precedence': 1 if is_exact_prefix else 0, # Make sure exact paths are evaluated before prefix + 'service': f'{service_name}.{obj.namespace}:{service_port}' + } + + if rule_host is not None: + if rule_host.startswith('*.'): + # Ingress allow specifying hosts with a single wildcard as the first label in the hostname. + # Transform the rule_host into a host_regex: + # *.star.com becomes ^[a-z0-9]([-a-z0-9]*[a-z0-9])?\.star\.com$ + spec['host'] = rule_host\ + .replace('.', '\\.')\ + .replace('*', '^[a-z0-9]([-a-z0-9]*[a-z0-9])?', 1) + '$' + spec['host_regex'] = True + else: + spec['host'] = rule_host + + path_mapping = NormalizedResource.from_data( + 'Mapping', + mapping_identifier, + namespace=obj.namespace, + labels=obj.labels, + spec=spec, + ) + + self.logger.debug(f"Generated mapping from Ingress {obj.name}: {path_mapping}") + self.manager.emit(path_mapping) + + # let's make arrangements to update Ingress' status now + self._update_status(obj) + + # Let's see if our Ingress resource has Ambassador annotations on it + self.manager.emit_annotated(NormalizedResource.from_kubernetes_object_annotation(obj)) diff --git a/python/ambassador/fetch/k8sprocessor.py b/python/ambassador/fetch/k8sprocessor.py index bd3f1d8c22..013945c68b 100644 --- a/python/ambassador/fetch/k8sprocessor.py +++ b/python/ambassador/fetch/k8sprocessor.py @@ -5,6 +5,7 @@ from ..config import Config +from .dependency import DependencyInjector from .k8sobject import KubernetesGVK, KubernetesObjectScope, KubernetesObjectKey, KubernetesObject from .resource import ResourceManager @@ -71,6 +72,10 @@ def aconf(self) -> Config: def logger(self) -> logging.Logger: return self.manager.logger + @property + def deps(self) -> DependencyInjector: + return self.manager.deps.for_instance(self) + class AggregateKubernetesProcessor (KubernetesProcessor): """ diff --git a/python/ambassador/fetch/knative.py b/python/ambassador/fetch/knative.py index a0006613a7..90dae59f62 100644 --- a/python/ambassador/fetch/knative.py +++ b/python/ambassador/fetch/knative.py @@ -8,9 +8,10 @@ from ..config import Config +from .dependency import ServiceDependency from .k8sobject import KubernetesGVK, KubernetesObject from .k8sprocessor import ManagedKubernetesProcessor -from .resource import NormalizedResource +from .resource import NormalizedResource, ResourceManager class KnativeIngressProcessor (ManagedKubernetesProcessor): @@ -20,6 +21,13 @@ class KnativeIngressProcessor (ManagedKubernetesProcessor): INGRESS_CLASS: ClassVar[str] = 'ambassador.ingress.networking.knative.dev' + service_dep: ServiceDependency + + def __init__(self, manager: ResourceManager): + super().__init__(manager) + + self.service_dep = self.deps.want(ServiceDependency) + def kinds(self) -> FrozenSet[KubernetesGVK]: return frozenset([KubernetesGVK.for_knative_networking('Ingress')]) @@ -141,7 +149,7 @@ def _update_status(self, obj: KubernetesObject) -> None: # out-of-cluster ingress to access the service. current_lb_domain = None - if not self.manager.ambassador_service or not self.manager.ambassador_service.name: + if not self.service_dep.ambassador_service or not self.service_dep.ambassador_service.name: self.logger.warning(f"Unable to set Knative {obj.kind} {obj.name}'s load balancer, could not find Ambassador service") else: # TODO: It is technically possible to use a domain other than @@ -149,7 +157,7 @@ def _update_status(self, obj: KubernetesObject) -> None: # the relevant domain by doing a DNS lookup on # kubernetes.default.svc, but this problem appears elsewhere in the # code as well and probably should just be fixed all at once. - current_lb_domain = f"{self.manager.ambassador_service.name}.{self.manager.ambassador_service.namespace}.svc.cluster.local" + current_lb_domain = f"{self.service_dep.ambassador_service.name}.{self.service_dep.ambassador_service.namespace}.svc.cluster.local" observed_ingress: Dict[str, Any] = next(iter(obj.status.get('privateLoadBalancer', {}).get('ingress', [])), {}) observed_lb_domain = observed_ingress.get('domainInternal') @@ -160,7 +168,7 @@ def _update_status(self, obj: KubernetesObject) -> None: status = self._make_status(generation=obj.generation, lb_domain=current_lb_domain) if status: - status_update = (obj.gvk.domain, obj.namespace or 'default', status) + status_update = (obj.gvk.domain, obj.namespace, status) self.logger.info(f"Updating Knative {obj.kind} {obj.name} status to {status_update}") self.aconf.k8s_status_updates[f"{obj.name}.{obj.namespace}"] = status_update else: diff --git a/python/ambassador/fetch/resource.py b/python/ambassador/fetch/resource.py index f8bdbc8520..8b754c314e 100644 --- a/python/ambassador/fetch/resource.py +++ b/python/ambassador/fetch/resource.py @@ -8,6 +8,7 @@ from ..config import ACResource, Config from ..utils import dump_yaml, parse_yaml, dump_json +from .dependency import DependencyManager from .k8sobject import KubernetesObjectScope, KubernetesObject from .location import LocationManager @@ -103,15 +104,15 @@ class ResourceManager: logger: logging.Logger aconf: Config + deps: DependencyManager locations: LocationManager - ambassador_service: Optional[KubernetesObject] elements: List[ACResource] - def __init__(self, logger: logging.Logger, aconf: Config): + def __init__(self, logger: logging.Logger, aconf: Config, deps: DependencyManager): self.logger = logger self.aconf = aconf + self.deps = deps self.locations = LocationManager() - self.ambassador_service = None self.elements = [] @property diff --git a/python/ambassador/fetch/secret.py b/python/ambassador/fetch/secret.py index f23646e7d3..11417c8729 100644 --- a/python/ambassador/fetch/secret.py +++ b/python/ambassador/fetch/secret.py @@ -2,9 +2,10 @@ from ..config import Config +from .dependency import SecretDependency from .k8sobject import KubernetesGVK, KubernetesObject from .k8sprocessor import ManagedKubernetesProcessor -from .resource import NormalizedResource +from .resource import NormalizedResource, ResourceManager class SecretProcessor (ManagedKubernetesProcessor): @@ -27,6 +28,11 @@ class SecretProcessor (ManagedKubernetesProcessor): 'root-cert.pem', ] + def __init__(self, manager: ResourceManager) -> None: + super().__init__(manager) + + self.deps.provide(SecretDependency) + def kinds(self) -> FrozenSet[KubernetesGVK]: return frozenset([KubernetesGVK('v1', 'Secret')]) diff --git a/python/ambassador/fetch/service.py b/python/ambassador/fetch/service.py index cb02e7c4ee..f26aa774ef 100644 --- a/python/ambassador/fetch/service.py +++ b/python/ambassador/fetch/service.py @@ -4,6 +4,7 @@ from ..config import Config +from .dependency import ServiceDependency from .k8sobject import KubernetesGVK, KubernetesObjectKey, KubernetesObject from .k8sprocessor import AggregateKubernetesProcessor, ManagedKubernetesProcessor from .resource import NormalizedResource, ResourceManager @@ -41,12 +42,14 @@ class InternalServiceProcessor (ManagedKubernetesProcessor): ServiceProcessor aggregate class. """ + service_dep: ServiceDependency helm_chart: Optional[str] discovered_services: Dict[KubernetesObjectKey, KubernetesObject] def __init__(self, manager: ResourceManager) -> None: super().__init__(manager) + self.service_dep = self.deps.provide(ServiceDependency) self.helm_chart = None self.discovered_services = {} @@ -88,7 +91,7 @@ def _process(self, obj: KubernetesObject) -> None: if self._is_ambassador_service(obj.labels, obj.spec.get('selector', {})): self.logger.debug(f"Found Ambassador service: {obj.name}") - self.manager.ambassador_service = obj + self.service_dep.ambassador_service = obj # Although we can't emit this resource immediately, we can handle # anything added as an annotation. diff --git a/python/ambassador/ir/ir.py b/python/ambassador/ir/ir.py index 06f29d64ff..c42c39cc24 100644 --- a/python/ambassador/ir/ir.py +++ b/python/ambassador/ir/ir.py @@ -1005,8 +1005,8 @@ def features(self) -> Dict[str, Any]: od['cluster_ingress_count'] = 0 # Provided for backward compatibility only. od['knative_ingress_count'] = self.aconf.get_count('knative_ingress') - od['k8s_ingress_count'] = len(self.aconf.k8s_ingresses) - od['k8s_ingress_class_count'] = len(self.aconf.k8s_ingress_classes) + od['k8s_ingress_count'] = self.aconf.get_count('k8s_ingress') + od['k8s_ingress_class_count'] = self.aconf.get_count('k8s_ingress_class') extauth = False extauth_proto: Optional[str] = None diff --git a/python/kat/harness.py b/python/kat/harness.py index 58de87aebf..c6d1a531ed 100755 --- a/python/kat/harness.py +++ b/python/kat/harness.py @@ -143,11 +143,7 @@ def kube_client_version(version_json=None): return None -def is_knative(): - # Skip KNative immediately for run_mode local. - if RUN_MODE == 'local': - return False - +def is_kube_server_client_compatible(debug_desc: str, requested_server_version: str, requested_client_version: str) -> bool: is_cluster_compatible = True kube_json = kube_version_json() @@ -155,26 +151,38 @@ def is_knative(): client_version = kube_client_version(kube_json) if server_version: - if version.parse(server_version) < version.parse('1.14'): - print(f"server version {server_version} is incompatible with Knative") + if version.parse(server_version) < version.parse(requested_server_version): + print(f"server version {server_version} is incompatible with {debug_desc}") is_cluster_compatible = False else: - print(f"server version {server_version} is compatible with Knative") + print(f"server version {server_version} is compatible with {debug_desc}") else: print("could not determine Kubernetes server version?") if client_version: - if version.parse(client_version) < version.parse('1.14'): - print(f"client version {client_version} is incompatible with Knative") + if version.parse(client_version) < version.parse(requested_client_version): + print(f"client version {client_version} is incompatible with {debug_desc}") is_cluster_compatible = False else: - print(f"client version {client_version} is compatible with Knative") + print(f"client version {client_version} is compatible with {debug_desc}") else: print("could not determine Kubernetes client version?") return is_cluster_compatible +def is_ingress_class_compatible() -> bool: + return is_kube_server_client_compatible('IngressClass', '1.18', '1.14') + + +def is_knative_compatible() -> bool: + # Skip KNative immediately for run_mode local. + if RUN_MODE == 'local': + return False + + return is_kube_server_client_compatible('Knative', '1.14', '1.14') + + def get_digest(data: str) -> str: s = sha256() s.update(data.encode('utf-8')) @@ -1491,7 +1499,7 @@ def _setup_k8s(self, selected): CRDS = load_manifest("crds") input_crds = CRDS - if is_knative(): + if is_knative_compatible(): KNATIVE_SERVING_CRDS = load_manifest("knative_serving_crds") input_crds += KNATIVE_SERVING_CRDS diff --git a/python/tests/t_ingress.py b/python/tests/t_ingress.py index cb1217efa8..bfb73b8dd2 100644 --- a/python/tests/t_ingress.py +++ b/python/tests/t_ingress.py @@ -5,7 +5,7 @@ import subprocess import time -from kat.harness import Query +from kat.harness import Query, is_ingress_class_compatible from abstract_tests import AmbassadorTest, HTTP, ServiceType from kat.utils import namespace_manifest @@ -341,3 +341,100 @@ def check(self): ingress_out, _ = ingress_run.communicate() ingress_json = json.loads(ingress_out) assert ingress_json['status'] == self.status_update, f"Expected Ingress status to be {self.status_update}, got {ingress_json['status']} instead" + + +class IngressStatusTestWithIngressClass(AmbassadorTest): + status_update = { + "loadBalancer": { + "ingress": [{ + "ip": "42.42.42.42" + }] + } + } + + def init(self): + self.target = HTTP() + + if not is_ingress_class_compatible(): + self.xfail = 'IngressClass is not supported in this cluster' + + def manifests(self) -> str: + return """ +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRole +metadata: + name: {self.name.k8s}-ext +rules: +- apiGroups: ["networking.k8s.io"] + resources: ["ingressclasses"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: {self.name.k8s}-ext +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {self.name.k8s}-ext +subjects: +- kind: ServiceAccount + name: {self.path.k8s} + namespace: {self.namespace} +--- +apiVersion: networking.k8s.io/v1beta1 +kind: IngressClass +metadata: + annotations: + getambassador.io/ambassador-id: {self.ambassador_id} + name: {self.name.k8s} +spec: + controller: getambassador.io/ingress-controller +--- +apiVersion: networking.k8s.io/v1beta1 +kind: Ingress +metadata: + annotations: + getambassador.io/ambassador-id: {self.ambassador_id} + name: {self.name.k8s} +spec: + ingressClassName: {self.name.k8s} + rules: + - http: + paths: + - backend: + serviceName: {self.target.path.k8s} + servicePort: 80 + path: /{self.name}/ +""" + super().manifests() + + def queries(self): + if sys.platform != 'darwin': + text = json.dumps(self.status_update) + + update_cmd = ['kubestatus', 'Service', '-n', 'default', '-f', f'metadata.name={self.name.k8s}', '-u', '/dev/fd/0'] + subprocess.run(update_cmd, input=text.encode('utf-8'), timeout=10) + # If you run these tests individually, the time between running kubestatus + # and the ingress resource actually getting updated is longer than the + # time spent waiting for resources to be ready, so this test will fail (most of the time) + time.sleep(1) + + yield Query(self.url(self.name + "/")) + yield Query(self.url(f'need-normalization/../{self.name}/')) + + def check(self): + if sys.platform == 'darwin': + pytest.xfail('not supported on Darwin') + + for r in self.results: + if r.backend: + assert r.backend.name == self.target.path.k8s, (r.backend.name, self.target.path.k8s) + assert r.backend.request.headers['x-envoy-original-path'][0] == f'/{self.name}/' + + # check for Ingress IP here + ingress_cmd = ["kubectl", "get", "-n", "default", "-o", "json", "ingress", self.path.k8s] + ingress_run = subprocess.Popen(ingress_cmd, stdout=subprocess.PIPE) + ingress_out, _ = ingress_run.communicate() + ingress_json = json.loads(ingress_out) + assert ingress_json['status'] == self.status_update, f"Expected Ingress status to be {self.status_update}, got {ingress_json['status']} instead" diff --git a/python/tests/test_fetch.py b/python/tests/test_fetch.py index dc96aee1a2..e06bfa1996 100644 --- a/python/tests/test_fetch.py +++ b/python/tests/test_fetch.py @@ -13,6 +13,7 @@ from ambassador import Config from ambassador.fetch import ResourceFetcher +from ambassador.fetch.dependency import DependencyManager, ServiceDependency, SecretDependency, IngressClassesDependency from ambassador.fetch.location import LocationManager from ambassador.fetch.resource import NormalizedResource, ResourceManager from ambassador.fetch.k8sobject import ( @@ -221,7 +222,7 @@ class TestAmbassadorProcessor: def test_mapping(self): aconf = Config() - mgr = ResourceManager(logger, aconf) + mgr = ResourceManager(logger, aconf, DependencyManager([])) assert AmbassadorProcessor(mgr).try_process(valid_mapping) assert len(mgr.elements) == 1 @@ -241,7 +242,7 @@ def test_mapping(self): def test_mapping_v1(self): aconf = Config() - mgr = ResourceManager(logger, aconf) + mgr = ResourceManager(logger, aconf, DependencyManager([])) assert AmbassadorProcessor(mgr).try_process(valid_mapping_v1) assert len(mgr.elements) == 1 @@ -316,7 +317,9 @@ def test_count(self): class TestServiceAnnotations: def setup(self): - self.manager = ResourceManager(logger, Config()) + self.manager = ResourceManager(logger, Config(), DependencyManager([ + ServiceDependency(), + ])) self.processor = ServiceProcessor(self.manager) def test_no_ambassador_annotation(self): @@ -378,5 +381,43 @@ def test_valid_annotation(self): assert self.manager.elements[0].get(key) == value +class TestDependencyManager: + + def setup(self): + self.deps = DependencyManager([ + SecretDependency(), + ServiceDependency(), + IngressClassesDependency(), + ]) + + def test_cyclic(self): + a = self.deps.for_instance(object()) + b = self.deps.for_instance(object()) + c = self.deps.for_instance(object()) + + a.provide(SecretDependency) + a.want(ServiceDependency) + b.provide(ServiceDependency) + b.want(IngressClassesDependency) + c.provide(IngressClassesDependency) + c.want(SecretDependency) + + with pytest.raises(ValueError): + self.deps.sorted_watt_keys() + + def test_sort(self): + a = self.deps.for_instance(object()) + b = self.deps.for_instance(object()) + c = self.deps.for_instance(object()) + + a.want(SecretDependency) + a.want(ServiceDependency) + a.provide(IngressClassesDependency) + b.provide(SecretDependency) + c.provide(ServiceDependency) + + assert self.deps.sorted_watt_keys() == ['secret', 'service', 'ingressclasses'] + + if __name__ == '__main__': pytest.main(sys.argv) diff --git a/python/tests/test_knative.py b/python/tests/test_knative.py index 2877c7c599..62c79a0a43 100644 --- a/python/tests/test_knative.py +++ b/python/tests/test_knative.py @@ -5,7 +5,7 @@ import pytest -from kat.harness import is_knative +from kat.harness import is_knative_compatible from kat.harness import load_manifest from ambassador import Config, IR from ambassador.fetch import ResourceFetcher @@ -153,7 +153,7 @@ def test_knative_counters(): def test_knative(): - if is_knative(): + if is_knative_compatible(): knative_test = KnativeTesting() knative_test.test_knative() else: