Skip to content

Commit

Permalink
Load registry schemas if type match (#3450)
Browse files Browse the repository at this point in the history
* Load registry schemas if type match
  • Loading branch information
kddejong authored Jul 2, 2024
1 parent 36ff520 commit 2e2034c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 34 deletions.
3 changes: 3 additions & 0 deletions src/cfnlint/rules/resources/PrimaryIdentifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def match(self, cfn: Template) -> RuleMatches:
# by the customer so if any primary identifiers are read
# only we have to skip evaluation
primary_ids = schema.schema.get("primaryIdentifier", [])
if not primary_ids:
continue

read_only_ids = schema.schema.get("readOnlyProperties", [])

if any(id in read_only_ids for id in primary_ids):
Expand Down
64 changes: 33 additions & 31 deletions src/cfnlint/schema/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,47 +152,49 @@ def get_resource_schema(self, region: str, resource_type: str) -> Schema:
Returns:
dict: returns the schema
"""
resource_type = self._normalize_resource_type(resource_type)
if resource_type not in self._registry_schemas:
resource_type = self._normalize_resource_type(resource_type)

if resource_type in self._removed_types:
raise ResourceNotFoundError(resource_type, region)

reg = ToPy(region)
rt = ToPy(resource_type)

schema = self._schemas[reg.name].get(resource_type)
if schema is None:
# dynamically import the modules as needed
self._provider_schema_modules[reg.name] = __import__(
f"{self._root.module}.{reg.py}", fromlist=[""]
)
# check cfn-lint provided schemas
if resource_type in self._registry_schemas:
self._schemas[reg.name][rt.name] = self._registry_schemas[rt.name]
return self._schemas[reg.name][rt.name]

# load the schema
if f"{rt.provider}.json" in self._provider_schema_modules[reg.name].cached:
schema_cached = copy(
self.get_resource_schema(
region=self._region_primary.name,
resource_type=rt.name,
)
schema = self._schemas[reg.name].get(rt.name)
if schema is not None:
return schema

# dynamically import the modules as needed
self._provider_schema_modules[reg.name] = __import__(
f"{self._root.module}.{reg.py}", fromlist=[""]
)
# check cfn-lint provided schemas
if rt.name in self._registry_schemas:
self._schemas[reg.name][rt.name] = self._registry_schemas[rt.name]
return self._schemas[reg.name][rt.name]

# load the schema
if f"{rt.provider}.json" in self._provider_schema_modules[reg.name].cached:
schema_cached = copy(
self.get_resource_schema(
region=self._region_primary.name,
resource_type=rt.name,
)
schema_cached.is_cached = True
self._schemas[reg.name][rt.name] = schema_cached
return self._schemas[reg.name][rt.name]
try:
self._schemas[reg.name][rt.name] = Schema(
load_resource(
self._provider_schema_modules[reg.name],
filename=f"{rt.provider}.json",
)
)
schema_cached.is_cached = True
self._schemas[reg.name][rt.name] = schema_cached
return self._schemas[reg.name][rt.name]
try:
self._schemas[reg.name][rt.name] = Schema(
load_resource(
self._provider_schema_modules[reg.name],
filename=f"{rt.provider}.json",
)
except Exception as e:
raise ResourceNotFoundError(rt.name, region) from e
)
return self._schemas[reg.name][rt.name]
return schema
except Exception as e:
raise ResourceNotFoundError(rt.name, region) from e

@lru_cache(maxsize=None)
def get_resource_types(self, region: str) -> list[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,11 @@ Resources:
- !Ref 'AWS::NoValue'
BadType:
Type: !Ref AWS::Region
Module1:
Type: MyCompany::MODULE
Properties:
Attribute1: test
Module2:
Type: MyCompany::MODULE
Properties:
Attribute2: test
12 changes: 12 additions & 0 deletions test/unit/module/schema/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,15 @@ def test_removed_types(self):

with self.assertRaises(ResourceNotFoundError):
self.manager.get_resource_schema(region, rt)

def test_type_normalization(self):

rt = "MyCompany::MODULE"
schema = self.manager.get_resource_schema("us-east-1", rt)

assert schema.schema.get("typeName") == "Module"

self.manager.get_resource_schema.cache_clear()
self.manager._registry_schemas[rt] = True
schema = self.manager.get_resource_schema("us-east-1", rt)
assert schema is True
4 changes: 1 addition & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ isolated_build = true
[testenv]
skip_install = True
commands =
pip install -e .
pip install -e .[sarif]
pip install -e .[junit]
pip install -e .[full]
coverage run -m pytest {posargs:test}
coverage xml
deps =
Expand Down

0 comments on commit 2e2034c

Please sign in to comment.