-
Notifications
You must be signed in to change notification settings - Fork 687
/
ir.py
1437 lines (1161 loc) · 57.7 KB
/
ir.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2018 Datawire. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import hashlib
import logging
import os
from ipaddress import ip_address
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, ValuesView
from typing import cast as typecast
from ..cache import Cache, NullCache
from ..config import Config
from ..constants import Constants
from ..fetch import ResourceFetcher
from ..utils import RichStatus, SavedSecret, SecretHandler, SecretInfo, dump_json, parse_bool
from ..VERSION import Commit, Version
from .irambassador import IRAmbassador
from .irauth import IRAuth
from .irbasemapping import IRBaseMapping
from .irbasemappinggroup import IRBaseMappingGroup
from .ircluster import IRCluster
from .irerrorresponse import IRErrorResponse
from .irfilter import IRFilter
from .irgofilter import IRGOFilter
from .irhost import HostFactory, IRHost
from .irhttpmapping import IRHTTPMapping
from .irlistener import IRListener, ListenerFactory
from .irlogservice import IRLogService, IRLogServiceFactory
from .irmappingfactory import MappingFactory
from .irratelimit import IRRateLimit
from .irresource import IRResource
from .irserviceresolver import IRServiceResolver, IRServiceResolverFactory, SvcEndpointSet
from .irtls import IRAmbassadorTLS, TLSModuleFactory
from .irtlscontext import IRTLSContext, TLSContextFactory
from .irtracing import IRTracing
#############################################################################
## ir.py -- the Ambassador Intermediate Representation (IR)
##
## After getting an ambassador.Config, you can create an ambassador.IR. The
## IR is the basis for everything else: you can use it to configure an Envoy
## or to run diagnostics.
##
## IRs are not meant to be terribly long-lived: if anything at all changes
## in your world, you should toss the IR and make a new one. In particular,
## it is _absolutely not OK_ to try to edit the contents of an IR and then
## re-run any of the generators -- IRs are to be considered immutable once
## created.
##
## This goes double in the incremental-reconfiguration world: the IRResources
## that make up the IR all point back to their IR to make life easier on the
## generators, so - to ease the transition to the incremental-reconfiguration
## world - right now we reset the IR pointer when we pull these objects out
## the cache. In the future this should be fixed, but at present, you can
## really mess up your world if you try to have two active IRs sharing a
## cache.
IRFileChecker = Callable[[str], bool]
class IR:
ambassador_module: IRAmbassador
ambassador_id: str
ambassador_namespace: str
ambassador_nodename: str
aconf: Config
cache: Cache
clusters: Dict[str, IRCluster]
agent_active: bool
agent_service: Optional[str]
agent_origination_ctx: Optional[IRTLSContext]
edge_stack_allowed: bool
file_checker: IRFileChecker
filters: List[IRFilter]
groups: Dict[str, IRBaseMappingGroup]
grpc_services: Dict[str, IRCluster]
hosts: Dict[str, IRHost]
invalid: List[Dict]
invalidate_groups_for: List[str]
# The key for listeners is "{socket_protocol}-{bindaddr}-{port}" (see IRListener.bind_to())
listeners: Dict[str, IRListener]
log_services: Dict[str, IRLogService]
ratelimit: Optional[IRRateLimit]
redirect_cleartext_from: Optional[int]
resolvers: Dict[str, IRServiceResolver]
router_config: Dict[str, Any]
saved_resources: Dict[str, IRResource]
saved_secrets: Dict[str, SavedSecret]
secret_handler: SecretHandler
secret_root: str
sidecar_cluster_name: Optional[str]
tls_contexts: Dict[str, IRTLSContext]
tls_module: Optional[IRAmbassadorTLS]
tracing: Optional[IRTracing]
@classmethod
def check_deltas(
cls, logger: logging.Logger, fetcher: "ResourceFetcher", cache: Optional[Cache] = None
) -> Tuple[str, bool, List[str]]:
# Assume that this should be marked as a complete reconfigure, and that we'll be
# resetting the cache.
config_type = "complete"
reset_cache = True
# to_invalidate is the list of things we can invalidate right now. If we're
# running with a cache, every valid Delta will get its cache key added into
# to_invalidate; after we finish looking at all the deltas, we'll invalidate
# all the entries in this list.
#
# Mapping deltas, though, are more complex: not only must we invalidate the
# Mapping, but we _also_ need to invalidate any cached Group that contains
# the Mapping (otherwise, adding a new Mapping to a cached Group won't work).
# This is messy, because the Delta doesn't have the information we need to
# compute the Group's cache key.
#
# We deal with this by adding the cache keys of any Mapping deltas to the
# invalidate_groups_for list, and then handing that to the IR so that the
# MappingFactory can use it to do the right thing.
#
# "But wait," I hear you cry, "you're only checking Mappings and TCPMappings
# right now anyway, so why bother separating these things?" That's because
# we expect the use of the cache to broaden, so we'll just go ahead and do
# this.
to_invalidate: List[str] = []
invalidate_groups_for: List[str] = []
# OK. If we don't have a cache, just skip all this crap.
if cache is not None:
# We have a cache. Start by assuming that we'll need to reset it,
# unless there are no deltas at all.
reset_cache = len(fetcher.deltas) > 0
# Next up: are there any deltas?
if fetcher.deltas:
# Yes. We're going to walk over them all and assemble a list
# of things to delete and a count of errors while processing our
# list.
delta_errors = 0
for delta in fetcher.deltas:
logger.debug(f"Delta: {delta}")
# The "kind" of a Delta must be a string; assert that to make
# mypy happy.
delta_kind = delta["kind"]
assert isinstance(delta_kind, str)
# Only worry about Mappings and TCPMappings right now.
if (delta_kind == "Mapping") or (delta_kind == "TCPMapping"):
# XXX C'mon, mypy, is this cast really necessary?
metadata = typecast(Dict[str, str], delta.get("metadata", {}))
name = metadata.get("name", "")
namespace = metadata.get("namespace", "")
if not name or not namespace:
# This is an error.
delta_errors += 1
logger.error(f"Delta object needs name and namespace: {delta}")
else:
key = IRBaseMapping.make_cache_key(delta_kind, name, namespace)
to_invalidate.append(key)
# If we're invalidating the Mapping, we need to invalidate its Group.
invalidate_groups_for.append(key)
# OK. If we have things to invalidate, and we have NO ERRORS...
if to_invalidate and not delta_errors:
# ...then we can invalidate all those things instead of clearing the cache.
reset_cache = False
for key in to_invalidate:
logger.debug(f"Delta: invalidating {key}")
cache.invalidate(key)
# When all is said and done, it's an incremental if we don't need to reset
# the cache.
if not reset_cache:
config_type = "incremental"
# This is _not_ an incremental reconfigure. Reset the cache...
else:
# OK, we're doing an incremental reconfigure.
config_type = "incremental"
cache.dump("Checking incoming deltas (reset_cache %s)", reset_cache)
return (config_type, reset_cache, invalidate_groups_for)
def __init__(
self,
aconf: Config,
secret_handler: SecretHandler,
file_checker: Optional[IRFileChecker] = None,
logger: Optional[logging.Logger] = None,
invalidate_groups_for: Optional[List[str]] = None,
cache: Optional[Cache] = None,
watch_only=False,
) -> None:
# Initialize the basics...
self.ambassador_id = Config.ambassador_id
self.ambassador_namespace = Config.ambassador_namespace
self.ambassador_nodename = aconf.ambassador_nodename
self.statsd = aconf.statsd
# ...then make sure we have a logger...
self.logger = logger or logging.getLogger("ambassador.ir")
# ...then make sure we have a cache (which might be a NullCache)...
self.cache = cache or NullCache(self.logger)
self.invalidate_groups_for = invalidate_groups_for or []
# ...then, finally, grab all the invalid objects from the aconf. This is for metrics later.
self.invalid = aconf.invalid
self.cache.dump("Fetcher")
# We're using setattr since since mypy complains about assigning directly to a method.
secret_root = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
# This setattr business is because mypy seems to think that, since self.file_checker is
# callable, any mention of self.file_checker must be a function call. Sigh.
setattr(self, "file_checker", file_checker if file_checker is not None else os.path.isfile)
# The secret_handler is _required_.
self.secret_handler = secret_handler
assert self.secret_handler, "Ambassador.IR requires a SecretHandler at initialization"
self.logger.debug("IR __init__:")
self.logger.debug("IR: Version %s built from commit %s" % (Version, Commit))
self.logger.debug("IR: AMBASSADOR_ID %s" % self.ambassador_id)
self.logger.debug("IR: Namespace %s" % self.ambassador_namespace)
self.logger.debug("IR: Nodename %s" % self.ambassador_nodename)
self.logger.debug(
"IR: Endpoints %s" % "enabled" if Config.enable_endpoints else "disabled"
)
self.logger.debug("IR: file checker: %s" % getattr(self, "file_checker").__name__)
self.logger.debug("IR: secret handler: %s" % type(self.secret_handler).__name__)
# First up: save the Config object. Its source map may be necessary later.
self.aconf = aconf
# Next, we'll want a way to keep track of resources we end up working
# with. It starts out empty.
self.saved_resources = {}
# Also, we have no saved secret stuff yet...
self.saved_secrets = {}
self.secret_info: Dict[str, SecretInfo] = {}
# ...and the initial IR state is empty _except for k8s_status_updates_.
#
# Note that we use a map for clusters, not a list -- the reason is that
# multiple mappings can use the same service, and we don't want multiple
# clusters.
self.breakers = {}
self.clusters = {}
self.filters = []
self.groups = {}
self.grpc_services = {}
self.hosts = {}
# self.invalidate_groups_for is handled above.
# self.k8s_status_updates is handled below.
self.listeners = {}
self.log_services = {}
self.outliers = {}
self.ratelimit = None
self.redirect_cleartext_from = None
self.resolvers = {}
self.saved_secrets = {}
self.secret_info = {}
self.services = {}
self.sidecar_cluster_name = None
self.tls_contexts = {}
self.tls_module = None
self.tracing = None
# Copy k8s_status_updates from our aconf.
self.k8s_status_updates = aconf.k8s_status_updates
# Check on the intercept agent and edge stack. Note that the Edge Stack touchfile is _not_
# within $AMBASSADOR_CONFIG_BASE_DIR: it stays in /ambassador no matter what.
self.agent_active = os.environ.get("AGENT_SERVICE", None) != None
# Allow an environment variable to state whether we're in Edge Stack. But keep the
# existing condition as sufficient, so that there is less of a chance of breaking
# things running in a container with this file present.
self.edge_stack_allowed = parse_bool(
os.environ.get("EDGE_STACK", "false")
) or os.path.exists("/ambassador/.edge_stack")
self.agent_origination_ctx = None
# OK, time to get this show on the road. First things first: set up the
# Ambassador module.
#
# The Ambassador module is special: it doesn't do anything in its setup() method, but
# instead defers all its heavy lifting to its finalize() method. Why? Because we need
# to create the Ambassador module very early to allow IRResource.lookup() to work, but
# we need to go pull in secrets and such before we can get all the Ambassador-module
# stuff fully set up.
#
# So. First, create the module.
self.ambassador_module = typecast(
IRAmbassador, self.save_resource(IRAmbassador(self, aconf))
)
# Next, grab whatever information our aconf has about secrets...
self.save_secret_info(aconf)
# ...and then it's on to default TLS stuff, both from the TLS module and from
# any TLS contexts.
#
# XXX This feels like a hack -- shouldn't it be class-wide initialization
# in TLSModule or TLSContext? So far it's the only place we need anything like
# this though.
TLSModuleFactory.load_all(self, aconf)
TLSContextFactory.load_all(self, aconf)
# After TLSContexts, grab Listeners...
ListenerFactory.load_all(self, aconf)
# Now that we know all of the listeners, we can check to see if there are any shared bindings
# accross protocols (TCP & UDP sharing same addres & port). When a TCP/HTTP listener binds
# to the same address and port of the UPD/HTTP Listener then it will be marked as http3_enabled=True.
# This causes the `alt-svc` header to be auto-injected into http responses on the TCP/HTTP responses.
# The alt-service header notifies clients (browsers, curl, libraries) that they can upgrade
# TCP connections to UDP (HTTP/3) connections.
#
# Note: at first glance it would seem this logic should sit inside the Listener class but
# we wait until all the listeners are loaded so that we can check for the existance of a
# "companion" TCP Listener. If a UDP listener was the first to be parsed then
# we wouldn't know at that time. Thus we need to wait until after all of them have been loaded.
udp_listeners = (l for l in self.listeners.values() if l.socket_protocol == "UDP")
for udp_listener in udp_listeners:
## this matches the `listener.bind_to` for the tcp listener
tcp_listener_key = f"tcp-{udp_listener.bind_address}-{udp_listener.port}"
tcp_listener = self.listeners.get(tcp_listener_key, None)
if tcp_listener is not None:
tcp_listener.http3_enabled = True
if "HTTP" in tcp_listener.protocolStack:
tcp_listener.http3_enabled = True
# ...then grab whatever we know about Hosts...
HostFactory.load_all(self, aconf)
# ...then set up for the intercept agent, if that's a thing.
self.agent_init(aconf)
# Finally, finalize all the Host stuff (including the !*@#&!* fallback context)...
HostFactory.finalize(self, aconf)
# Now we can finalize the Ambassador module, to tidy up secrets et al. We do this
# here so that secrets and TLS contexts are available.
if not self.ambassador_module.finalize(self, aconf):
# Uhoh.
self.ambassador_module.set_active(False) # This can't be good.
_activity_str = "watching" if watch_only else "starting"
_mode_str = "OSS"
if self.agent_active:
_mode_str = "Intercept Agent"
elif self.edge_stack_allowed:
_mode_str = "Edge Stack"
self.logger.debug(f"IR: {_activity_str} {_mode_str}")
# Next up, initialize our IRServiceResolvers...
IRServiceResolverFactory.load_all(self, aconf)
# ...and then we can finalize the agent, if that's a thing.
self.agent_finalize(aconf)
# Once here, if we're only watching, we're done.
if watch_only:
return
# REMEMBER FOR SAVING YOU NEED TO CALL save_resource!
# THIS IS VERY IMPORTANT!
# Save circuit breakers, outliers, and services.
self.breakers = aconf.get_config("CircuitBreaker") or {}
self.outliers = aconf.get_config("OutlierDetection") or {}
self.services = aconf.get_config("service") or {}
# Save tracing, ratelimit, and logging settings.
self.tracing = typecast(IRTracing, self.save_resource(IRTracing(self, aconf)))
self.ratelimit = typecast(IRRateLimit, self.save_resource(IRRateLimit(self, aconf)))
IRLogServiceFactory.load_all(self, aconf)
# After the Ambassador and TLS modules are done, we need to set up the
# filter chains. Note that order of the filters matters. Start with CORS,
# so that preflights will work for all filters.
self.save_filter(
IRFilter(ir=self, aconf=aconf, rkey="ir.cors", kind="ir.cors", name="cors", config={})
)
# We always add the Go filter calling to our filter plugin service if applicable before auth
self.save_filter(IRGOFilter(ir=self, aconf=aconf))
# Next is auth...
self.save_filter(IRAuth(self, aconf))
# ...then the ratelimit filter...
if self.ratelimit:
self.save_filter(self.ratelimit, already_saved=True)
# ...and the error response filter...
self.save_filter(
IRErrorResponse(
self,
aconf,
self.ambassador_module.get("error_response_overrides", None),
referenced_by_obj=self.ambassador_module,
)
)
# ...and, finally, the barely-configurable router filter.
router_config = {}
if self.tracing:
router_config["start_child_span"] = True
self.save_filter(
IRFilter(
ir=self,
aconf=aconf,
rkey="ir.router",
kind="ir.router",
name="router",
type="decoder",
config=router_config,
)
)
# We would handle other modules here -- but guess what? There aren't any.
# At this point ambassador, tls, and the deprecated auth module are all there
# are, and they're handled above. So. At this point go sort out all the Mappings.
MappingFactory.load_all(self, aconf)
self.walk_saved_resources(aconf, "add_mappings")
TLSModuleFactory.finalize(self, aconf)
MappingFactory.finalize(self, aconf)
# We can't finalize the listeners until _after_ we have all the TCPMapping
# information we might need, so that happens here.
ListenerFactory.finalize(self, aconf)
# At this point we should know the full set of clusters, so we can generate
# appropriate envoy names.
#
# Envoy cluster name generation happens in two steps. First, we check every
# cluster and set the envoy name to the cluster name if it is short enough.
# If it isn't, we group all of the long cluster names by a common prefix
# and normalize them later.
#
# This ensures that:
# - All IRCluster objects have an envoy_name
# - All envoy_name fields are valid cluster names, ie: they are short enough
collisions: Dict[str, List[str]] = {}
for name in sorted(self.clusters.keys()):
if len(name) > 60:
# Too long. Gather this cluster by name prefix and normalize
# its name below.
h = hashlib.new("sha1")
h.update(name.encode("utf-8"))
hd = h.hexdigest()[0:16].upper()
short_name = name[0:40] + "-" + hd
cluster = self.clusters[name]
self.logger.debug(f"COLLISION: compress {name} to {short_name}")
collision_list = collisions.setdefault(short_name, [])
collision_list.append(name)
else:
# Short enough, set the envoy name to the cluster name.
self.clusters[name]["envoy_name"] = name
for short_name in sorted(collisions.keys()):
name_list = collisions[short_name]
i = 0
for name in sorted(name_list):
mangled_name = "%s-%d" % (short_name, i)
i += 1
cluster = self.clusters[name]
self.logger.debug("COLLISION: mangle %s => %s" % (name, mangled_name))
# We must not modify a cluster's name (nor its rkey, for that matter)
# because our object caching implementation depends on stable object
# names and keys. If we were to update it, we could lose track of an
# existing object and accidentally create a duplicate (tested in
# python/tests/test_cache.py test_long_cluster_1).
#
# Instead, the resulting IR must set envoy_name to the mangled name, which
# is guaranteed to be valid in envoy configuration.
#
# An important consequence of this choice is that we must never read back
# envoy config to create IRCluster config, since the cluster names are
# not necessarily the same. This is currently fine, since we never use
# envoy config as a source of truth - we leave that to the cluster annotations
# and CRDs.
#
# Another important consideration is that when the cache is active, we need
# to shred any cached cluster with this mangled_name, because the mangled_name
# can change as new clusters appear! This is obviously not ideal.
#
# XXX This is doubly a hack because it's duplicating this magic format from
# v2cluster.py and v3cluster.py.
self.cache.invalidate(f"V2-{cluster.cache_key}")
self.cache.invalidate(f"V3-{cluster.cache_key}")
self.cache.dump(
"Invalidate clusters V2-%s, V3-%s", cluster.cache_key, cluster.cache_key
)
# OK. Finally, we can update the envoy_name.
cluster["envoy_name"] = mangled_name
self.logger.debug("COLLISION: envoy_name %s" % cluster["envoy_name"])
# After we have the cluster names fixed up, go finalize filters.
if self.tracing:
self.tracing.finalize()
if self.ratelimit:
self.ratelimit.finalize()
for filter in self.filters:
filter.finalize()
# XXX Brutal hackery here! Probably this is a clue that Config and IR and such should have
# a common container that can hold errors.
def post_error(
self,
rc: Union[str, RichStatus],
resource: Optional[IRResource] = None,
rkey: Optional[str] = None,
log_level=logging.INFO,
):
self.aconf.post_error(rc, resource=resource, rkey=rkey, log_level=log_level)
def agent_init(self, aconf: Config) -> None:
"""
Initialize as the Intercept Agent, if we're doing that.
THIS WHOLE METHOD NEEDS TO GO AWAY: instead, just configure the agent with CRDs as usual.
However, that's just too painful to contemplate without `edgectl inject-agent`.
:param aconf: Config to work with
:return: None
"""
# Intercept stuff is an Edge Stack thing.
if not (self.edge_stack_allowed and self.agent_active):
self.logger.debug("Intercept agent not active, skipping initialization")
return
self.agent_service = os.environ.get("AGENT_SERVICE", None)
if self.agent_service is None:
# This is technically impossible, but whatever.
self.logger.info("Intercept agent active but no AGENT_SERVICE? skipping initialization")
self.agent_active = False
return
self.logger.debug(f"Intercept agent active for {self.agent_service}, initializing")
# We're going to either create a Host to terminate TLS, or to do cleartext. In neither
# case will we do ACME. Set additionalPort to -1 so we don't grab 8080 in the TLS case.
host_args: Dict[str, Any] = {
"hostname": "*",
"selector": {"matchLabels": {"intercept": self.agent_service}},
"acmeProvider": {"authority": "none"},
"requestPolicy": {
"insecure": {
"additionalPort": -1,
},
},
}
# Have they asked us to do TLS?
agent_termination_secret = os.environ.get("AGENT_TLS_TERM_SECRET", None)
if agent_termination_secret:
# Yup.
host_args["tlsSecret"] = {"name": agent_termination_secret}
else:
# No termination secret, so do cleartext.
host_args["requestPolicy"]["insecure"]["action"] = "Route"
host = IRHost(
self,
aconf,
rkey=self.ambassador_module.rkey,
location=self.ambassador_module.location,
name="agent-host",
**host_args,
)
if host.is_active():
host.referenced_by(self.ambassador_module)
host.sourced_by(self.ambassador_module)
self.logger.debug(f"Intercept agent: saving host {host}")
# self.logger.debug(host.as_json())
self.save_host(host)
else:
self.logger.debug(f"Intercept agent: not saving inactive host {host}")
# How about originating TLS?
agent_origination_secret = os.environ.get("AGENT_TLS_ORIG_SECRET", None)
if agent_origination_secret:
# Uhhhh. Synthesize a TLSContext for this, I guess.
#
# XXX What if they already have a context with this name?
ctx = IRTLSContext(
self,
aconf,
rkey=self.ambassador_module.rkey,
location=self.ambassador_module.location,
name="agent-origination-context",
secret=agent_origination_secret,
)
ctx.referenced_by(self.ambassador_module)
self.save_tls_context(ctx)
self.logger.debug(f"Intercept agent: saving origination TLSContext {ctx.name}")
# self.logger.debug(ctx.as_json())
self.agent_origination_ctx = ctx
def agent_finalize(self, aconf) -> None:
if not (self.edge_stack_allowed and self.agent_active):
self.logger.debug(f"Intercept agent not active, skipping finalization")
return
# self.logger.info(f"Intercept agent active for {self.agent_service}, finalizing")
# We don't want to listen on the default AES ports (8080, 8443) as that is likely to
# conflict with the user's application running in the same Pod.
agent_listen_port_str = os.environ.get("AGENT_LISTEN_PORT", None)
agent_grpc = os.environ.get("AGENT_ENABLE_GRPC", "false")
if agent_listen_port_str is None:
self.ambassador_module.service_port = Constants.SERVICE_PORT_AGENT
else:
try:
self.ambassador_module.service_port = int(agent_listen_port_str)
except ValueError:
self.post_error(f"Intercept agent listen port {agent_listen_port_str} is not valid")
self.agent_active = False
return
agent_port_str = os.environ.get("AGENT_PORT", None)
if agent_port_str is None:
self.post_error("Intercept agent requires both AGENT_SERVICE and AGENT_PORT to be set")
self.agent_active = False
return
agent_port = -1
try:
agent_port = int(agent_port_str)
except:
self.post_error(f"Intercept agent port {agent_port_str} is not valid")
self.agent_active = False
return
# self.logger.info(f"Intercept agent active for {self.agent_service}:{agent_port}, adding fallback mapping")
# XXX OMG this is a crock. Don't use precedence -1000000 for this, because otherwise Edge
# Stack might decide it's the Edge Policy Console fallback mapping and force it to be
# routed insecure. !*@&#*!@&#* We need per-mapping security settings.
#
# XXX What if they already have a mapping with this name?
ctx_name = None
if self.agent_origination_ctx:
ctx_name = self.agent_origination_ctx.name
mapping = IRHTTPMapping(
self,
aconf,
rkey=self.ambassador_module.rkey,
location=self.ambassador_module.location,
name="agent-fallback-mapping",
metadata_labels={"ambassador_diag_class": "private"},
prefix="/",
rewrite="/",
service=f"127.0.0.1:{agent_port}",
grpc=agent_grpc,
# Making sure we don't have shorter timeouts on intercepts than the original Mapping
timeout_ms=60000,
idle_timeout_ms=60000,
tls=ctx_name,
precedence=-999999,
) # No, really. See comment above.
mapping.referenced_by(self.ambassador_module)
self.add_mapping(aconf, mapping)
def cache_fetch(self, key: str) -> Optional[IRResource]:
"""
Fetch a key from our cache. If we get anything, make sure that its
IR pointer is set back to us -- since the cache can easily outlive
the IR, chances are pretty high that the object might've originally
been part of a different IR.
Yes, this implies that trying to use the cache for multiple IRs at
the same time is a Very Bad Idea.
"""
rsrc = self.cache[key]
# Did we get anything?
if rsrc is not None:
# By definition, anything the IR layer pulls from the cache must be
# an IRResource.
assert isinstance(rsrc, IRResource)
# Since it's an IRResource, it has a pointer to the IR. Reset that.
rsrc.ir = self
return rsrc
def cache_add(self, rsrc: IRResource) -> None:
"""
Add an IRResource to our cache. Mostly this is here to let mypy check
that everything cached by the IR layer is an IRResource.
"""
self.cache.add(rsrc)
def cache_link(self, owner: IRResource, owned: IRResource) -> None:
"""
Link two IRResources in our cache. Mostly this is here to let mypy check
that everything linked by the IR layer is an IRResource.
"""
self.cache.link(owner, owned)
def save_resource(self, resource: IRResource) -> IRResource:
if resource.is_active():
self.saved_resources[resource.rkey] = resource
return resource
def save_host(self, host: IRHost) -> None:
extant_host = self.hosts.get(host.name, None)
is_valid = True
if extant_host:
self.post_error(
"Duplicate Host %s; keeping definition from %s" % (host.name, extant_host.location)
)
is_valid = False
if is_valid:
self.hosts[host.name] = host
# Get saved hosts.
def get_hosts(self) -> List[IRHost]:
return list(self.hosts.values())
# Save secrets from our aconf.
def save_secret_info(self, aconf):
aconf_secrets = aconf.get_config("secrets") or {}
self.logger.debug(f"IR: aconf has secrets: {aconf_secrets.keys()}")
for secret_key, aconf_secret in aconf_secrets.items():
# Ignore anything that doesn't at least have a public half.
#
# (We include 'user_key' here because ACME private keys use that, and they
# should not generate errors.)
# (We include 'crl_pem' here because CRL secrets use that, and they
# should not generate errors.)
if (
aconf_secret.get("tls_crt")
or aconf_secret.get("cert-chain_pem")
or aconf_secret.get("user_key")
or aconf_secret.get("crl_pem")
):
secret_info = SecretInfo.from_aconf_secret(aconf_secret)
secret_name = secret_info.name
secret_namespace = secret_info.namespace
self.logger.debug(
'saving "%s.%s" (from %s) in secret_info',
secret_name,
secret_namespace,
secret_key,
)
self.secret_info[f"{secret_name}.{secret_namespace}"] = secret_info
else:
self.logger.debug(
"not saving secret_info from %s because there is no public half", secret_key
)
def save_tls_context(self, ctx: IRTLSContext) -> None:
extant_ctx = self.tls_contexts.get(ctx.name, None)
is_valid = True
if extant_ctx:
self.post_error(
"Duplicate TLSContext %s; keeping definition from %s"
% (ctx.name, extant_ctx.location)
)
is_valid = False
if ctx.get("redirect_cleartext_from", None) is not None:
if self.redirect_cleartext_from is None:
self.redirect_cleartext_from = ctx.redirect_cleartext_from
else:
if self.redirect_cleartext_from != ctx.redirect_cleartext_from:
self.post_error(
"TLSContext: %s; configured conflicting redirect_from port: %s"
% (ctx.name, ctx.redirect_cleartext_from)
)
is_valid = False
if is_valid:
self.tls_contexts[ctx.name] = ctx
def get_resolver(self, name: str) -> Optional[IRServiceResolver]:
return self.resolvers.get(name, None)
def add_resolver(self, resolver: IRServiceResolver) -> None:
self.resolvers[resolver.name] = resolver
def has_tls_context(self, name: str) -> bool:
return bool(self.get_tls_context(name))
def get_tls_context(self, name: str) -> Optional[IRTLSContext]:
return self.tls_contexts.get(name, None)
def get_tls_contexts(self) -> ValuesView[IRTLSContext]:
return self.tls_contexts.values()
def resolve_secret(self, resource: IRResource, secret_name: str, namespace: str):
# OK. Do we already have a SavedSecret for this?
ss_key = f"{secret_name}.{namespace}"
ss = self.saved_secrets.get(ss_key, None)
if ss:
# Done. Return it.
self.logger.debug(f"resolve_secret {ss_key}: using cached SavedSecret")
self.secret_handler.still_needed(resource, secret_name, namespace)
return ss
# OK, do we have a secret_info for it??
# self.logger.debug(f"resolve_secret {ss_key}: checking secret_info")
secret_info = self.secret_info.get(ss_key, None)
if secret_info:
self.logger.debug(f"resolve_secret {ss_key}: found secret_info")
self.secret_handler.still_needed(resource, secret_name, namespace)
else:
# No secret_info, so ask the secret_handler to find us one.
self.logger.debug(f"resolve_secret {ss_key}: no secret_info, asking handler to load")
secret_info = self.secret_handler.load_secret(resource, secret_name, namespace)
if not secret_info:
self.logger.error(f"Secret {ss_key} unknown")
ss = SavedSecret(secret_name, namespace, None, None, None, None, None)
else:
self.logger.debug(f"resolve_secret {ss_key}: found secret, asking handler to cache")
# OK, we got a secret_info. Cache that using the secret handler.
ss = self.secret_handler.cache_secret(resource, secret_info)
# Save this for next time.
self.saved_secrets[secret_name] = ss
return ss
def resolve_resolver(
self, cluster: IRCluster, resolver_name: Optional[str]
) -> IRServiceResolver:
# Which resolver should we use?
if not resolver_name:
resolver_name = self.ambassador_module.get("resolver", "kubernetes-service")
# Casting to str is OK because the Ambassador module's resolver must be a string,
# so all the paths for resolver_name land with it being a string.
resolver = self.get_resolver(typecast(str, resolver_name))
assert resolver is not None
return resolver
def resolve_targets(
self,
cluster: IRCluster,
resolver_name: Optional[str],
hostname: str,
namespace: str,
port: int,
) -> Optional[SvcEndpointSet]:
# Is the host already an IP address?
is_ip_address = False
try:
x = ip_address(hostname)
is_ip_address = True
except ValueError:
pass
if is_ip_address:
# Already an IP address, great.
self.logger.debug(f"cluster {cluster.name}: {hostname} is already an IP address")
return [{"ip": hostname, "port": port, "target_kind": "IPaddr"}]
resolver = self.resolve_resolver(cluster, resolver_name)
# It should not be possible for resolver to be unset here.
if not resolver:
self.post_error(
f"cluster {cluster.name} has invalid resolver {resolver_name}?", rkey=cluster.rkey
)
return None
# OK, ask the resolver for the target list. Understanding the mechanics of resolution
# and the load balancer policy and all that is up to the resolver.
return resolver.resolve(self, cluster, hostname, namespace, port)
def save_filter(self, resource: IRFilter, already_saved=False) -> None:
if resource.is_active():
if not already_saved:
resource = typecast(IRFilter, self.save_resource(resource))
self.filters.append(resource)
def walk_saved_resources(self, aconf, method_name):
for res in self.saved_resources.values():
getattr(res, method_name)(self, aconf)
def save_listener(self, listener: IRListener) -> None:
listener_key = listener.bind_to()
extant_listener = self.listeners.get(listener_key, None)
is_valid = True
if extant_listener:
err_msg = (
f"Duplicate listener {listener.name} on {listener.socket_protocol.lower()}://{listener.bind_address}:{listener.port};"
f" keeping definition from {extant_listener.location}"
)
self.post_error(err_msg)
is_valid = False
if is_valid:
self.listeners[listener_key] = listener
def add_mapping(self, aconf: Config, mapping: IRBaseMapping) -> Optional[IRBaseMappingGroup]:
mapping.check_status()
if mapping.is_active():
if mapping.group_id not in self.groups:
# Is this group in our external cache?
group_key = mapping.group_class().key_for_id(mapping.group_id)
group = self.cache_fetch(group_key)
if group is not None:
self.logger.debug(f"IR: got group from cache for {mapping.name}")
else:
self.logger.debug(f"IR: synthesizing group for {mapping.name}")
group_name = "GROUP: %s" % mapping.name
group_class = mapping.group_class()
group = group_class(
ir=self,
aconf=aconf,
location=mapping.location,
name=group_name,