Skip to content

Commit

Permalink
Switch to Path for tracking all things path
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed May 16, 2024
1 parent fe01f61 commit 2a4a460
Show file tree
Hide file tree
Showing 68 changed files with 314 additions and 333 deletions.
2 changes: 1 addition & 1 deletion src/cfnlint/context/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__all__ = ["Context", "create_context_for_template"]

from cfnlint.context.context import Context, create_context_for_template
from cfnlint.context.context import Context, Path, create_context_for_template
116 changes: 68 additions & 48 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,64 @@ def has_language_extensions_transform(self):
return bool(lang_extensions_transform in self._transforms)


@dataclass(frozen=True)
class Path:
"""
A `Path` keeps track of the different Path values
"""

# path keeps track of the path as we move down the template
# Example: Resources, MyResource, Properties, Name, ...
path: Deque[str | int] = field(init=True, default_factory=deque)

# value_path is an override of the value if we got it from another place
# like a Parameter default value
# Example: Parameters, MyParameter, Default, ...
value_path: Deque[str | int] = field(init=True, default_factory=deque)

# cfn_path is a generic path used by cfn-lint to help make
# writing rules easier. The resource name is replaced by the type
# lists are replaced with a *
# Example: Resources, AWS::S3::Bucket, Properties, Name, ...
# Example: Resources, *, Type
cfn_path: Deque[str] = field(init=True, default_factory=deque)

def descend(self, **kwargs):
"""
Create a new Path by appending values
"""
cls = self.__class__

for f in fields(Path):
if f.init:
if kwargs.get(f.name) is not None:
kwargs[f.name] = getattr(self, f.name) + deque([kwargs[f.name]])
else:
kwargs[f.name] = getattr(self, f.name)

return cls(**kwargs)

def evolve(self, **kwargs):
"""
Create a new path without appending values
"""
cls = self.__class__

for f in fields(Path):
if f.init:
kwargs.setdefault(f.name, getattr(self, f.name))

return cls(**kwargs)

@property
def path_string(self):
return "/".join(str(p) for p in self.path)

@property
def cfn_path_string(self):
return "/".join(self.cfn_path)


@dataclass(frozen=True)
class Context:
"""
Expand All @@ -61,14 +119,7 @@ class Context:
# supported functions at this point in the template
functions: Sequence[str] = field(init=True, default_factory=list)

# path keeps track of the path as we move down the template
# Example: Resources, MyResource, Properties, Name, ...
path: Deque[str | int] = field(init=True, default_factory=deque)
# value_path is an override of the value if we got it from another place
# like a Parameter default value
value_path: Deque[str | int] = field(init=True, default_factory=deque)

cfn_path: Deque[str | int] = field(init=True, default_factory=deque)
path: Path = field(init=True, default_factory=Path)

# cfn-lint Template class
parameters: Dict[str, "Parameter"] = field(init=True, default_factory=dict)
Expand All @@ -95,50 +146,17 @@ def __post_init__(self) -> None:
if self.path is None:
self.path = deque([])

def descend(
self,
path: Sequence[str | int] = (),
ref_values: Dict[str, Any] | None = None,
value_path: Sequence[str | int] = (),
cfn_path: Sequence[str | int] = (),
regions: Sequence[str] | None = None,
):
def evolve(self, **kwargs) -> "Context":
"""
Create a new context merging together attributes
Create a new context without merging together attributes
"""
cls = self.__class__

kwargs: dict[str, Any] = {}

if value_path:
kwargs["value_path"] = deque(value_path)

if path:
kwargs["path"] = self.path + deque(path)

if cfn_path:
kwargs["cfn_path"] = self.cfn_path + deque(cfn_path)

if ref_values is not None:
if "ref_values" in kwargs:
new_ref_values = self.ref_values.copy()
new_ref_values.update(ref_values)
new_ref_values.update(kwargs["ref_values"])
kwargs["ref_values"] = new_ref_values

if regions is not None:
kwargs["regions"] = list(regions)

for f in fields(Context):
if f.init:
kwargs.setdefault(f.name, getattr(self, f.name))

return cls(**kwargs)

def evolve(self, **kwargs) -> "Context":
"""
Create a new context without merging together attributes
"""
cls = self.__class__

for f in fields(Context):
if f.init:
kwargs.setdefault(f.name, getattr(self, f.name))
Expand All @@ -161,8 +179,10 @@ def ref_value(self, instance: str) -> Iterator[Tuple[str | List[str], "Context"]
return
if instance in self.parameters:
for v, path in self.parameters[instance].ref(self):
yield v, self.descend(
value_path=deque(["Parameters", instance]) + path,
yield v, self.evolve(
path=self.path.evolve(
value_path=deque(["Parameters", instance]) + path
),
ref_values={instance: v},
)
return
Expand Down Expand Up @@ -478,6 +498,6 @@ def create_context_for_template(cfn):
transforms=transforms,
mappings=mappings,
regions=cfn.regions,
path=deque([]),
path=Path(),
functions=["Fn::Transform"],
)
16 changes: 11 additions & 5 deletions src/cfnlint/jsonschema/_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from dataclasses import dataclass, field, fields
from typing import Any, Sequence, Tuple
from typing import TYPE_CHECKING, Any, Sequence, Tuple

from cfnlint.helpers import REGEX_DYN_REF, ToPy
from cfnlint.jsonschema._utils import ensure_list

if TYPE_CHECKING:
from cfnlint.jsonschema.protocols import Validator


_all_types = ["array", "boolean", "integer", "number", "object", "string"]


Expand Down Expand Up @@ -53,7 +59,7 @@ class FunctionFilter:
default=True,
)

def _filter_schemas(self, schema, validator: Any) -> Tuple[Any, Any]:
def _filter_schemas(self, schema, validator: Validator) -> Tuple[Any, Any]:
"""
Filter the schemas to only include the ones that are required
"""
Expand All @@ -64,8 +70,8 @@ def _filter_schemas(self, schema, validator: Any) -> Tuple[Any, Any]:
# Example: Typically we want to remove (!Ref AWS::NoValue)
# to count minItems, maxItems, required properties but if we
# are in an If we need to be more strict
if len(validator.context.path) > 0:
if validator.context.path[-1] in ["Fn::If"]:
if len(validator.context.path.path) > 0:
if validator.context.path.path[-1] in ["Fn::If"]:
return schema, None

standard_schema = {}
Expand All @@ -78,7 +84,7 @@ def _filter_schemas(self, schema, validator: Any) -> Tuple[Any, Any]:

if self.add_cfn_lint_keyword and "$ref" not in standard_schema:
standard_schema["cfnLint"] = ensure_list(standard_schema.get("cfnLint", []))
standard_schema["cfnLint"].append("/".join(validator.context.cfn_path))
standard_schema["cfnLint"].append("/".join(validator.context.path.cfn_path))

# some times CloudFormation dumps to standard nested "json".
# it will do by using {"type": "object"} with no properties
Expand Down
7 changes: 3 additions & 4 deletions src/cfnlint/jsonschema/_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# Code is taken from jsonschema package and adapted CloudFormation use
# https://github.com/python-jsonschema/jsonschema

from collections import deque
from copy import deepcopy
from difflib import SequenceMatcher
from typing import Any, Dict, Sequence
Expand Down Expand Up @@ -547,7 +546,7 @@ def type(
yield ValidationError(f"{instance!r} is not of type {reprs}")


def prefixItems(validator, prefixItems, instance, schema):
def prefixItems(validator: Validator, prefixItems: Any, instance: Any, schema: Any):
if not validator.is_type(instance, "array"):
return

Expand All @@ -556,6 +555,6 @@ def prefixItems(validator, prefixItems, instance, schema):
instance=item,
schema=subschema,
schema_path=index,
path=deque([index]),
property_path=deque(["*"]),
path=index,
property_path="*",
)
40 changes: 29 additions & 11 deletions src/cfnlint/jsonschema/_resolvers_cfn.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
if "DefaultValue" in options:
for value, v, _ in validator.resolve_value(options["DefaultValue"]):
yield value, v.evolve(
context=v.context.evolve(value_path=deque([4, "DefaultValue"]))
context=v.context.evolve(
path=v.context.path.evolve(
value_path=deque([4, "DefaultValue"])
)
),
), None
default_value = value

Expand All @@ -66,7 +70,9 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
not (equal(map_name, each)) for each in mappings
):
yield None, map_v.evolve(
context=map_v.context.evolve(value_path=deque([0]))
context=map_v.context.evolve(
path=map_v.context.path.evolve(value_path=deque([0])),
),
), ValidationError(
f"{map_name!r} is not one of {mappings!r}", path=[0]
)
Expand All @@ -79,7 +85,9 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
not (equal(top_level_key, each)) for each in top_level_keys
):
yield None, top_v.evolve(
context=top_v.context.evolve(value_path=deque([1]))
context=top_v.context.evolve(
path=top_v.context.path.evolve(value_path=deque([1])),
),
), ValidationError(
f"{top_level_key!r} is not one of {top_level_keys!r}",
path=[0],
Expand All @@ -96,7 +104,11 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
for each in second_level_keys
):
yield None, second_v.evolve(
context=second_v.context.evolve(value_path=deque([2]))
context=second_v.context.evolve(
path=second_v.context.path.evolve(
value_path=deque([2])
),
),
), ValidationError(
f"{second_level_key!r} is not one of {second_level_keys!r}",
path=[0],
Expand All @@ -111,12 +123,16 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
value,
validator.evolve(
context=validator.context.evolve(
value_path=[
"Mappings",
map_name,
top_level_key,
second_level_key,
]
path=validator.context.path.evolve(
value_path=deque(
[
"Mappings",
map_name,
top_level_key,
second_level_key,
]
)
)
)
),
None,
Expand Down Expand Up @@ -298,7 +314,9 @@ def if_(validator: Validator, instance: Any) -> ResolutionResult:
yield (
value,
v.evolve(
context=v.context.evolve(value_path=deque([i])),
context=v.context.evolve(
path=v.context.path.evolve(value_path=deque([i])),
),
),
err,
)
Expand Down
2 changes: 1 addition & 1 deletion src/cfnlint/jsonschema/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Validator(Protocol):
cfn: Template | None
context: Context
function_filter: FunctionFilter
cfn_path: deque[str | int]
cfn_path: deque[str]

def __init__(
self,
Expand Down
10 changes: 6 additions & 4 deletions src/cfnlint/jsonschema/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,15 @@ def descend(
schema: Any,
path: str | int | None = None,
schema_path: str | int | None = None,
property_path: str | int | None = None,
property_path: str | None = None,
) -> ValidationResult:
for error in self.evolve(
schema=schema,
context=self.context.descend(
path=[path] if path else [],
cfn_path=[property_path] if property_path else [],
context=self.context.evolve(
path=self.context.path.descend(
path=path,
cfn_path=property_path,
),
),
).iter_errors(instance):
if path is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ class DynamicReferenceSecretsManagerPath(CloudFormationLintRule):
tags = ["functions", "dynamic reference"]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
if len(validator.context.path) >= 3:
if len(validator.context.path.path) >= 3:
if (
validator.context.path[0] == "Resources"
and validator.context.path[2] == "Properties"
validator.context.path.path[0] == "Resources"
and validator.context.path.path[2] == "Properties"
):
return

Expand Down
5 changes: 3 additions & 2 deletions src/cfnlint/rules/functions/DynamicReferenceSecureString.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def __init__(self):
]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
keyword = self.get_keyword(validator)
if keyword in self.exceptions:
print(1, validator.context.path.cfn_path)
print(validator.context.path.cfn_path_string)
if validator.context.path.cfn_path_string in self.exceptions:
return

yield ValidationError(
Expand Down
Loading

0 comments on commit 2a4a460

Please sign in to comment.