Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a resource processor for ingresses and ingress classes #3143

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ Please see the [Envoy documentation](https://www.envoyproxy.io/docs/envoy/latest
- Bugfix: Make sure that `labels` specifying headers with extra attributes are correctly supported again ([#3137])
- Bugfix: Support Consul services when the `ConsulResolver` and the `Mapping` aren't in the same namespace, and legacy mode is not enabled.
- Feature: Ambassador now reads the ENVOY_CONCURRENCY environment variable to optionally set the [--concurrency](https://www.envoyproxy.io/docs/envoy/latest/operations/cli#cmdoption-concurrency) command line option when launching Envoy. This controls the number of worker threads used to serve requests and can be used to fine-tune system resource usage.
- Bugfix: Fix failure to start when one or more IngressClasses are present in a cluster ([#3142]).
- Bugfix: Prevent potential reconcile loop when updating the status of an Ingress.

[#3137]: https://github.com/datawire/ambassador/issues/3137
[#3142]: https://github.com/datawire/ambassador/issues/3142

## [1.10.0] January 04, 2021
[1.10.0]: https://github.com/datawire/ambassador/compare/v1.9.1...v1.10.0
Expand Down
1 change: 1 addition & 0 deletions builder/sync-excludes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ __pycache__
.pytest_cache
.mypy_cache
.dmypy.json
.coverage
go.sum

bin/
Expand Down
2 changes: 0 additions & 2 deletions python/ambassador/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ def __init__(self, schema_dir_path: Optional[str]=None) -> None:

self.logger.debug("SCHEMA DIR %s" % os.path.abspath(self.schema_dir_path))
self.k8s_status_updates: Dict[str, Tuple[str, str, Optional[Dict[str, Any]]]] = {} # Tuple is (name, namespace, status_json)
self.k8s_ingresses: Dict[str, Any] = {}
self.k8s_ingress_classes: Dict[str, Any] = {}
self.pod_labels: Dict[str, str] = {}
self._reset()

Expand Down
183 changes: 183 additions & 0 deletions python/ambassador/fetch/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from typing import Any, Collection, Iterator, Mapping, MutableSet, Optional, Protocol, Sequence, Type, TypeVar

from collections import defaultdict
import dataclasses

from .k8sobject import KubernetesObject


class Dependency (Protocol):
"""
Dependencies link information provided by processors of a given Watt
invocation to other processors that need the processed result. This results
in an ordering of keys so that processors can be dependent on each other
without direct knowledge of where data is coming from.
"""

def watt_key(self) -> str: ...


class ServiceDependency (Dependency):
"""
A dependency that exposes information about the Kubernetes service for
Ambassador itself.
"""

ambassador_service: Optional[KubernetesObject]

def __init__(self) -> None:
self.ambassador_service = None

def watt_key(self) -> str:
return 'service'


class SecretDependency (Dependency):
"""
A dependency that is satisfied once secret information has been mapped and
emitted.
"""

def watt_key(self) -> str:
return 'secret'


class IngressClassesDependency (Dependency):
"""
A dependency that provides the list of ingress classes that are valid (i.e.,
have the proper controller) for this cluster.
"""

ingress_classes: MutableSet[str]

def __init__(self):
self.ingress_classes = set()

def watt_key(self) -> str:
return 'ingressclasses'


D = TypeVar('D', bound=Dependency)


class DependencyMapping (Protocol):

def __contains__(self, key: Type[D]) -> bool: ...
def __getitem__(self, key: Type[D]) -> D: ...


class DependencyInjector:
"""
Each processor instance is provided with a dependency injector that allows
it to declare what dependencies it provides as part of its processing and
what dependencies it wants to do its processing.

Note that dependencies need not be fulfilled; for example, nothing may
provide information about the Ambassador service or the list of valid
ingress classes. Processors should be prepared to deal with the absence of
valid data when they run.
"""

wants: MutableSet[Type[Dependency]]
provides: MutableSet[Type[Dependency]]
deps: DependencyMapping

def __init__(self, deps: DependencyMapping) -> None:
self.wants = set()
self.provides = set()
self.deps = deps

def want(self, cls: Type[D]) -> D:
self.wants.add(cls)
return self.deps[cls]

def provide(self, cls: Type[D]) -> D:
self.provides.add(cls)
return self.deps[cls]


class DependencyGraph:
"""
Once dependency relationships are known, this class provides the ability to
link them holistically and traverse them in topological order. It is most
useful in the context of the sorted_watt_keys() method of the
DependencyManager.
"""

@dataclasses.dataclass
class Vertex:
out: MutableSet[Any]
in_count: int

vertices: Mapping[Any, Vertex]

def __init__(self) -> None:
self.vertices = defaultdict(lambda: DependencyGraph.Vertex(out=set(), in_count=0))

def connect(self, a: Any, b: Any) -> None:
if b not in self.vertices[a].out:
self.vertices[a].out.add(b)
self.vertices[b].in_count += 1

def traverse(self) -> Iterator[Any]:
"""
Returns the items in this graph in topological order.
"""

if len(self.vertices) == 0:
return

# This method implements Kahn's algorithm. See
# https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm for
# more information.

# Create a copy of the counts of each inbound edge so we can mutate
# them.
in_counts = {obj: vertex.in_count for obj, vertex in self.vertices.items()}

# Find the roots of the graph.
queue = [obj for obj, in_count in in_counts.items() if in_count == 0]

# No roots of a graph with at least one vertex indicates a cycle.
if len(queue) == 0:
raise ValueError('cyclic')

while len(queue) > 0:
cur = queue.pop(0)
yield cur

for obj in self.vertices[cur].out:
in_counts[obj] -= 1
if in_counts[obj] == 0:
queue.append(obj)

assert sum(in_counts.values()) == 0, 'Traversal did not reach every vertex exactly once'


class DependencyManager:
"""
A manager that provides access to a set of dependencies for arbitrary object
instances and the ability to compute a sorted list of Watt keys that
represent the processing order for the dependencies.
"""

deps: DependencyMapping
injectors: Mapping[Any, DependencyInjector]

def __init__(self, deps: Collection[D]) -> None:
self.deps = {dep.__class__: dep for dep in deps}
self.injectors = defaultdict(lambda: DependencyInjector(self.deps))

def for_instance(self, obj: Any) -> DependencyInjector:
return self.injectors[obj]

def sorted_watt_keys(self) -> Sequence[Dependency]:
g = DependencyGraph()

for obj, injector in self.injectors.items():
for cls in injector.provides:
g.connect(obj, cls)
for cls in injector.wants:
g.connect(cls, obj)

return [self.deps[obj].watt_key() for obj in g.traverse() if obj in self.deps]
Loading