-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
Copy pathtranslator.py
464 lines (398 loc) · 22.9 KB
/
translator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
import copy
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from boto3 import Session
from samtranslator.feature_toggle.feature_toggle import (
FeatureToggle,
FeatureToggleDefaultConfigProvider,
)
from samtranslator.internal.types import GetManagedPolicyMap
from samtranslator.intrinsics.actions import FindInMapAction
from samtranslator.intrinsics.resolver import IntrinsicsResolver
from samtranslator.intrinsics.resource_refs import SupportedResourceReferences
from samtranslator.metrics.method_decorator import MetricsMethodWrapperSingleton
from samtranslator.metrics.metrics import DummyMetricsPublisher, Metrics
from samtranslator.model import Resource, ResourceResolver, ResourceTypeResolver, sam_resources
from samtranslator.model.api.api_generator import SharedApiUsagePlan
from samtranslator.model.eventsources.push import Api
from samtranslator.model.exceptions import (
DuplicateLogicalIdException,
ExceptionWithMessage,
InvalidDocumentException,
InvalidEventException,
InvalidResourceException,
InvalidTemplateException,
)
from samtranslator.model.preferences.deployment_preference_collection import DeploymentPreferenceCollection
from samtranslator.model.sam_resources import SamConnector
from samtranslator.parser.parser import Parser
from samtranslator.plugins import BasePlugin, LifeCycleEvents
from samtranslator.plugins.api.default_definition_body_plugin import DefaultDefinitionBodyPlugin
from samtranslator.plugins.application.serverless_app_plugin import ServerlessAppPlugin
from samtranslator.plugins.globals.globals_plugin import GlobalsPlugin
from samtranslator.plugins.policies.policy_templates_plugin import PolicyTemplatesForResourcePlugin
from samtranslator.plugins.sam_plugins import SamPlugins
from samtranslator.policy_template_processor.processor import PolicyTemplatesProcessor
from samtranslator.sdk.parameter import SamParameterValues
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.translator.verify_logical_id import verify_unique_logical_id
from samtranslator.utils.actions import ResolveDependsOn
from samtranslator.utils.traverse import traverse
from samtranslator.validator.value_validator import sam_expect
class Translator:
"""Translates SAM templates into CloudFormation templates"""
def __init__(
self,
managed_policy_map: Optional[Dict[str, str]],
sam_parser: Parser,
plugins: Optional[List[BasePlugin]] = None,
boto_session: Optional[Session] = None,
metrics: Optional[Metrics] = None,
) -> None:
"""
:param dict managed_policy_map: Map of managed policy names to the ARNs
:param sam_parser: Instance of a SAM Parser
:param list of samtranslator.plugins.BasePlugin plugins: List of plugins to be installed in the translator,
in addition to the default ones.
"""
self.managed_policy_map = managed_policy_map
self.plugins = plugins
self.sam_parser = sam_parser
self.feature_toggle: Optional[FeatureToggle] = None
self.boto_session = boto_session
self.metrics = metrics if metrics else Metrics("ServerlessTransform", DummyMetricsPublisher())
MetricsMethodWrapperSingleton.set_instance(self.metrics)
self.document_errors: List[ExceptionWithMessage] = []
if self.boto_session:
ArnGenerator.BOTO_SESSION_REGION_NAME = self.boto_session.region_name
def _get_function_names(
self, resource_dict: Dict[str, Any], intrinsics_resolver: IntrinsicsResolver
) -> Dict[str, str]:
"""
:param resource_dict: AWS::Serverless::Function resource is provided as input
:param intrinsics_resolver: to resolve intrinsics for function_name
:return: a dictionary containing api_logical_id as the key and concatenated String of all function_names
associated with this api as the value
"""
if resource_dict.get("Type", "").strip() == "AWS::Serverless::Function":
events_properties = resource_dict.get("Properties", {}).get("Events", {})
events = list(events_properties.values()) if events_properties else []
for item in events:
# If the function event type is `Api` then gets the function name and
# adds to the function_names dict with key as the api_name and value as the function_name
item_properties = item.get("Properties", {})
if item.get("Type") == "Api" and item_properties.get("RestApiId"):
rest_api = item_properties.get("RestApiId")
api_name = Api.get_rest_api_id_string(rest_api)
if not isinstance(api_name, str):
continue
raw_function_name = resource_dict.get("Properties", {}).get("FunctionName")
resolved_function_name = intrinsics_resolver.resolve_parameter_refs(
copy.deepcopy(raw_function_name)
)
if not resolved_function_name:
continue
self.function_names.setdefault(api_name, "")
self.function_names[api_name] += str(resolved_function_name)
return self.function_names
def translate( # noqa: PLR0912, PLR0915
self,
sam_template: Dict[str, Any],
parameter_values: Dict[str, Any],
feature_toggle: Optional[FeatureToggle] = None,
passthrough_metadata: Optional[bool] = False,
get_managed_policy_map: Optional[GetManagedPolicyMap] = None,
) -> Dict[str, Any]:
"""Loads the SAM resources from the given SAM manifest, replaces them with their corresponding
CloudFormation resources, and returns the resulting CloudFormation template.
:param dict sam_template: the SAM manifest, as loaded by json.load() or yaml.load(), or as provided by \
CloudFormation transforms.
:param dict parameter_values: Map of template parameter names to their values. It is a required parameter that
should at least be an empty map. By providing an empty map, the caller explicitly opts-into the idea
that some functionality that relies on resolving parameter references might not work as expected
(ex: auto-creating new Lambda Version when CodeUri contains reference to template parameter). This is
why this parameter is required
:returns: a copy of the template with SAM resources replaced with the corresponding CloudFormation, which may \
be dumped into a valid CloudFormation JSON or YAML template
"""
self.feature_toggle = feature_toggle or FeatureToggle(
FeatureToggleDefaultConfigProvider(), stage=None, account_id=None, region=None
)
self.function_names: Dict[Any, Any] = {}
self.redeploy_restapi_parameters = {}
sam_parameter_values = SamParameterValues(parameter_values)
sam_parameter_values.add_default_parameter_values(sam_template)
sam_parameter_values.add_pseudo_parameter_values(self.boto_session)
parameter_values = sam_parameter_values.parameter_values
# Create & Install plugins
sam_plugins = prepare_plugins(self.plugins, parameter_values)
self.sam_parser.parse(sam_template=sam_template, parameter_values=parameter_values, sam_plugins=sam_plugins)
# replaces Connectors attributes with serverless Connector resources
resources = sam_template.get("Resources", {})
embedded_connectors = self._get_embedded_connectors(resources)
connector_resources = self._update_resources(embedded_connectors)
resources.update(connector_resources)
self._delete_connectors_attribute(resources)
template = copy.deepcopy(sam_template)
macro_resolver = ResourceTypeResolver(sam_resources)
intrinsics_resolver = IntrinsicsResolver(parameter_values)
# ResourceResolver is used by connector, its "resources" will be
# updated in-place by other transforms so connector transform
# can see the transformed resources.
resource_resolver = ResourceResolver(template.get("Resources", {}))
mappings_resolver = IntrinsicsResolver(
template.get("Mappings", {}), {FindInMapAction.intrinsic_name: FindInMapAction()}
)
deployment_preference_collection = DeploymentPreferenceCollection()
supported_resource_refs = SupportedResourceReferences()
shared_api_usage_plan = SharedApiUsagePlan()
changed_logical_ids = {}
route53_record_set_groups: Dict[Any, Any] = {}
for logical_id, resource_dict in self._get_resources_to_iterate(sam_template, macro_resolver):
try:
macro = macro_resolver.resolve_resource_type(resource_dict).from_dict(
logical_id, resource_dict, sam_plugins=sam_plugins
)
kwargs = macro.resources_to_link(sam_template["Resources"])
kwargs["managed_policy_map"] = self.managed_policy_map
kwargs["get_managed_policy_map"] = get_managed_policy_map
kwargs["intrinsics_resolver"] = intrinsics_resolver
kwargs["mappings_resolver"] = mappings_resolver
kwargs["deployment_preference_collection"] = deployment_preference_collection
kwargs["conditions"] = template.get("Conditions")
kwargs["resource_resolver"] = resource_resolver
kwargs["original_template"] = sam_template
# add the value of FunctionName property if the function is referenced with the api resource
self.redeploy_restapi_parameters["function_names"] = self._get_function_names(
resource_dict, intrinsics_resolver
)
kwargs["redeploy_restapi_parameters"] = self.redeploy_restapi_parameters
kwargs["shared_api_usage_plan"] = shared_api_usage_plan
kwargs["feature_toggle"] = self.feature_toggle
kwargs["route53_record_set_groups"] = route53_record_set_groups
translated = macro.to_cloudformation(**kwargs)
supported_resource_refs = macro.get_resource_references(translated, supported_resource_refs)
# Some resources mutate their logical ids. Track those to change all references to them:
if logical_id != macro.logical_id:
changed_logical_ids[logical_id] = macro.logical_id
del template["Resources"][logical_id]
for resource in translated:
if verify_unique_logical_id(resource, sam_template["Resources"]):
# For each generated resource, pass through existing metadata that may exist on the original SAM resource.
_r = resource.to_dict()
if (
resource_dict.get("Metadata")
and passthrough_metadata
and not template["Resources"].get(resource.logical_id)
):
_r[resource.logical_id]["Metadata"] = resource_dict["Metadata"]
template["Resources"].update(_r)
else:
self.document_errors.append(
DuplicateLogicalIdException(logical_id, resource.logical_id, resource.resource_type)
)
except (InvalidResourceException, InvalidEventException, InvalidTemplateException) as e:
self.document_errors.append(e)
if deployment_preference_collection.any_enabled():
template["Resources"].update(deployment_preference_collection.get_codedeploy_application().to_dict())
if deployment_preference_collection.needs_resource_condition():
new_conditions = deployment_preference_collection.create_aggregate_deployment_condition()
if new_conditions:
template.get("Conditions", {}).update(new_conditions)
if not deployment_preference_collection.can_skip_service_role():
template["Resources"].update(deployment_preference_collection.get_codedeploy_iam_role().to_dict())
for logical_id in deployment_preference_collection.enabled_logical_ids():
try:
template["Resources"].update(
deployment_preference_collection.deployment_group(logical_id).to_dict()
)
except InvalidResourceException as e:
self.document_errors.append(e)
# Run the after-transform plugin target
try:
sam_plugins.act(LifeCycleEvents.after_transform_template, template)
except (InvalidDocumentException, InvalidResourceException, InvalidTemplateException) as e:
self.document_errors.append(e)
# Cleanup
if "Transform" in template:
del template["Transform"]
if len(self.document_errors) == 0:
resolveDependsOn = ResolveDependsOn(resolution_data=changed_logical_ids) # Initializes ResolveDependsOn
template = traverse(template, [resolveDependsOn])
template = intrinsics_resolver.resolve_sam_resource_id_refs(template, changed_logical_ids)
return intrinsics_resolver.resolve_sam_resource_refs(template, supported_resource_refs)
raise InvalidDocumentException(self.document_errors)
# private methods
def _get_resources_to_iterate(
self, sam_template: Dict[str, Any], macro_resolver: ResourceTypeResolver
) -> List[Tuple[str, Dict[str, Any]]]:
"""
Returns a list of resources to iterate, order them based on the following order:
1. AWS::Serverless::Function - because API Events need to modify the corresponding Serverless::Api resource.
2. AWS::Serverless::StateMachine - because API Events need to modify the corresponding Serverless::Api resource.
3. AWS::Serverless::Api
4. Anything else
5. AWS::Serverless::Connector - because connector profiles only work with raw CloudFormation resources
This is necessary because a Function or State Machine resource with API Events will modify the API resource's Swagger JSON.
Therefore API resource needs to be parsed only after all the Swagger modifications are complete.
:param dict sam_template: SAM template
:param macro_resolver: Resolver that knows if a resource can be processed or not
:return list: List containing tuple of (logicalId, resource_dict) in the order of processing
"""
functions = []
statemachines = []
apis = []
others = []
connectors = []
resources = sam_template["Resources"]
for logicalId, resource in resources.items():
data = (logicalId, resource)
# Skip over the resource if it is not a SAM defined Resource
if not macro_resolver.can_resolve(resource):
continue
if resource["Type"] == "AWS::Serverless::Function":
functions.append(data)
elif resource["Type"] == "AWS::Serverless::StateMachine":
statemachines.append(data)
elif resource["Type"] in ("AWS::Serverless::Api", "AWS::Serverless::HttpApi"):
apis.append(data)
elif resource["Type"] == "AWS::Serverless::Connector":
connectors.append(data)
else:
others.append(data)
return functions + statemachines + apis + others + connectors
@staticmethod
def _update_resources(connectors_list: List[Resource]) -> Dict[str, Any]:
connector_resources = {}
for connector in connectors_list:
connector_resources.update(connector.to_dict())
return connector_resources
@staticmethod
def _delete_connectors_attribute(resources: Dict[str, Any]) -> None:
for resource in resources.values():
if "Connectors" not in resource:
continue
del resource["Connectors"]
def _get_embedded_connectors(self, resources: Dict[str, Any]) -> List[Resource]:
"""
Loops through the SAM Template resources to find any connectors that have been attached to the resources.
Converts those attached connectors into Connector resources and returns a list of them
:param dict resources: Dict of resources from the SAM template
:return List[SamConnector]: List of the generated SAM Connectors
"""
connectors = []
# Loop through the resources in the template and see if any connectors have been attached
for source_logical_id, resource in resources.items():
if "Connectors" not in resource:
continue
try:
sam_expect(
resource.get("Connectors"),
source_logical_id,
f"{source_logical_id}.Connectors",
is_resource_attribute=True,
).to_be_a_map()
except InvalidResourceException as e:
self.document_errors.append(e)
continue
for connector_logical_id, connector_dict in resource["Connectors"].items():
try:
full_connector_logical_id = source_logical_id + connector_logical_id
# can't use sam_expect since this is neither a property nor a resource attribute
if not isinstance(connector_dict, dict):
raise InvalidResourceException(
full_connector_logical_id,
f"{source_logical_id}.{full_connector_logical_id} should be a map.",
)
generated_connector = self._get_generated_connector(
source_logical_id,
full_connector_logical_id,
connector_logical_id,
connector_dict,
)
if not verify_unique_logical_id(generated_connector, resources):
raise DuplicateLogicalIdException(
source_logical_id, full_connector_logical_id, generated_connector.resource_type
)
connectors.append(generated_connector)
except (InvalidResourceException, DuplicateLogicalIdException) as e:
self.document_errors.append(e)
return connectors
def _get_generated_connector(
self,
source_logical_id: str,
full_connector_logical_id: str,
connector_logical_id: str,
connector_dict: Dict[str, Any],
) -> Resource:
"""
Generates the connector resource from the embedded connector
:param str source_logical_id: Logical id of the resource the connector is attached to
:param str full_connector_logical_id: source_logical_id + connector_logical_id
:param str connector_logical_id: Logical id of the connector defined by the user
:param dict connector_dict: The properties of the connector including the Destination, Permissions and optionally the SourceReference
:return: The generated SAMConnector resource
"""
connector = copy.deepcopy(connector_dict)
connector["Type"] = SamConnector.resource_type
properties = sam_expect(
connector.get("Properties"),
source_logical_id,
f"Connectors.{connector_logical_id}.Properties",
is_resource_attribute=True,
).to_be_a_map()
properties["Source"] = {"Id": source_logical_id}
if "SourceReference" in properties:
source_reference = sam_expect(
properties.get("SourceReference"),
source_logical_id,
f"Connectors.{connector_logical_id}.Properties.SourceReference",
).to_be_a_map()
# can't allow user to override the Id using SourceReference
if "Id" in source_reference:
raise InvalidResourceException(connector_logical_id, "'Id' shouldn't be defined in 'SourceReference'.")
properties["Source"].update(source_reference)
del properties["SourceReference"]
return SamConnector.from_dict(full_connector_logical_id, connector)
def prepare_plugins(plugins: Optional[List[BasePlugin]], parameters: Optional[Dict[str, Any]] = None) -> SamPlugins:
"""
Creates & returns a plugins object with the given list of plugins installed. In addition to the given plugins,
we will also install a few "required" plugins that are necessary to provide complete support for SAM template spec.
:param plugins: list of samtranslator.plugins.BasePlugin plugins: List of plugins to install
:param parameters: Dictionary of parameter values
:return samtranslator.plugins.SamPlugins: Instance of `SamPlugins`
"""
if parameters is None:
parameters = {}
required_plugins = [
DefaultDefinitionBodyPlugin(),
make_implicit_rest_api_plugin(),
make_implicit_http_api_plugin(),
GlobalsPlugin(),
make_policy_template_for_function_plugin(),
]
plugins = plugins or []
# If a ServerlessAppPlugin does not yet exist, create one and add to the beginning of the required plugins list.
if not any(isinstance(plugin, ServerlessAppPlugin) for plugin in plugins):
required_plugins.insert(0, ServerlessAppPlugin(parameters=parameters))
# Execute customer's plugins first before running SAM plugins. It is very important to retain this order because
# other plugins will be dependent on this ordering.
return SamPlugins(plugins + required_plugins)
if TYPE_CHECKING:
from samtranslator.plugins.api.implicit_http_api_plugin import ImplicitHttpApiPlugin
from samtranslator.plugins.api.implicit_rest_api_plugin import ImplicitRestApiPlugin
def make_implicit_rest_api_plugin() -> "ImplicitRestApiPlugin":
# This is necessary to prevent a circular dependency on imports when loading package
from samtranslator.plugins.api.implicit_rest_api_plugin import ImplicitRestApiPlugin
return ImplicitRestApiPlugin()
def make_implicit_http_api_plugin() -> "ImplicitHttpApiPlugin":
# This is necessary to prevent a circular dependency on imports when loading package
from samtranslator.plugins.api.implicit_http_api_plugin import ImplicitHttpApiPlugin
return ImplicitHttpApiPlugin()
def make_policy_template_for_function_plugin() -> PolicyTemplatesForResourcePlugin:
"""
Constructs an instance of policy templates processing plugin using default policy templates JSON data
:return plugins.policies.policy_templates_plugin.PolicyTemplatesForResourcePlugin: Instance of the plugin
"""
policy_templates = PolicyTemplatesProcessor.get_default_policy_templates_json()
processor = PolicyTemplatesProcessor(policy_templates)
return PolicyTemplatesForResourcePlugin(processor)