Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/schema/releases #2

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/cd-release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: "[CD] Create release"
on:
push:
tags:
- "*"

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.x"
- name: Build
run: scripts/release_schemas/create_cfn_schema_rule.py
- name: Release
uses: softprops/action-gh-release@v2
if: startsWith(github.ref, 'refs/tags/')
with:
files: |
build/schemas-cfnlint.zip
build/schemas-draft7.zip
95 changes: 95 additions & 0 deletions scripts/release_schemas/_translator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

# Translate cfn-lint unique keywords into json schema keywords
import logging
from collections import deque
from typing import Any, Iterator

from cfnlint.schema import PROVIDER_SCHEMA_MANAGER

logger = logging.getLogger(__name__)


def required_xor(properties: list[str]) -> dict[str, list[Any]]:

return {"oneOf": [{"required": [p]} for p in properties]}


def dependent_excluded(properties: dict[str, list[str]]) -> dict[str, list[Any]]:
dependencies: dict[str, Any] = {"dependencies": {}}
for prop, exclusions in properties.items():
dependencies["dependencies"][prop] = {"not": {"anyOf": []}}
for exclusion in exclusions:
dependencies["dependencies"][prop]["not"]["anyOf"].append(
{"required": exclusion}
)

return dependencies


_keywords = {
"requiredXor": required_xor,
"dependentExcluded": dependent_excluded,
}


def _find_keywords(schema: Any) -> Iterator[deque[str | int]]:

if isinstance(schema, list):
for i, item in enumerate(schema):
for path in _find_keywords(item):
path.appendleft(i)
yield path
elif isinstance(schema, dict):
for key, value in schema.items():
if key in _keywords:
yield deque([key, value])
else:
for path in _find_keywords(value):
path.appendleft(key)
yield path


def translator(resource_type: str, region: str):
keywords = list(
_find_keywords(
PROVIDER_SCHEMA_MANAGER.get_resource_schema(
region=region, resource_type=resource_type
).schema
)
)

for keyword in keywords:
value = keyword.pop()
key = keyword.pop()
if not keyword:
path = ""
else:
path = f"/{'/'.join(str(k) for k in keyword)}"

patch = [
{
"op": "add",
"path": f"{path}/allOf",
"value": [],
}
]

logger.info(f"Patch {resource_type} add allOf for {key}")
PROVIDER_SCHEMA_MANAGER._schemas[region][resource_type].patch(patches=patch)

patch = [
{
"op": "remove",
"path": f"{path}/{key}",
},
{"op": "add", "path": f"{path}/allOf/-", "value": _keywords[key](value)}, # type: ignore
]

logger.info(f"Patch {resource_type} replace for {key}")
PROVIDER_SCHEMA_MANAGER._schemas[region][resource_type].patch(patches=patch)
176 changes: 176 additions & 0 deletions scripts/release_schemas/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#!/usr/bin/env python
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import logging
import tarfile
from collections import deque
from pathlib import Path

import _translator

from cfnlint.helpers import REGIONS, ToPy, format_json_string, load_plugins
from cfnlint.schema import PROVIDER_SCHEMA_MANAGER

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def _get_schema_path(schema, path):
s = schema.schema
schema_path = deque([])
while path:
key = path.popleft()
if key == "*":
schema_path.append("items")
s = s["items"]
else:
s = s["properties"][key]
schema_path.extend(["properties", key])

pointer = s.get("$ref")
if pointer:
_, s = schema.resolver.resolve(pointer)
schema_path = deque(pointer.split("/")[1:])

return schema_path


def _build_patch(path, patch):
if not path:
path_str = "/allOf"
else:
path_str = f"/{'/'.join(path)}/allOf"

return (
[
{
"op": "add",
"path": path_str,
"value": [],
}
],
[
{
"op": "add",
"path": f"{path_str}/-",
"value": patch,
}
],
)


schemas = {}

##########################
#
# Build the definitive list of all resource types across all regions
#
###########################

for region in ["us-east-1"] + list((set(REGIONS) - set(["us-east-1"]))):
for resource_type in PROVIDER_SCHEMA_MANAGER.get_resource_types(region):
if resource_type in ["AWS::CDK::Metadata", "Module"]:
continue
if resource_type not in schemas:
schemas[resource_type] = region


##########################
#
# Merge in rule schemas into the resource schemas
#
###########################

rules_folder = Path("src") / "cfnlint" / "rules"

rules = load_plugins(
rules_folder,
name="CfnLintJsonSchema",
modules=(
"cfnlint.rules.jsonschema.CfnLintJsonSchema",
"cfnlint.rules.jsonschema.CfnLintJsonSchema.CfnLintJsonSchema",
),
)

for rule in rules:
if rule.__class__.__base__ == (
"cfnlint.rules.jsonschema."
"CfnLintJsonSchemaRegional.CfnLintJsonSchemaRegional"
):
continue
if not rule.id or rule.schema == {}:
continue

for keyword in rule.keywords:
if not keyword.startswith("Resources/"):
continue
path = deque(keyword.split("/"))

if len(path) < 3:
continue

path.popleft()
resource_type = path.popleft()
resource_properties = path.popleft()
if resource_type not in schemas and resource_properties != "Properties":
continue

schema_path = _get_schema_path(
PROVIDER_SCHEMA_MANAGER.get_resource_schema(
schemas[resource_type], resource_type
),
path,
)
all_of_patch, schema_patch = _build_patch(schema_path, rule.schema)

PROVIDER_SCHEMA_MANAGER._schemas[schemas[resource_type]][resource_type].patch(
patches=all_of_patch
)
PROVIDER_SCHEMA_MANAGER._schemas[schemas[resource_type]][resource_type].patch(
patches=schema_patch
)

logger.info(f"Patch {rule.id} for {resource_type} in {schemas[resource_type]}")


build_dir = Path("build")
schemas_dir = build_dir / "schemas"
schemas_cfnlint_dir = schemas_dir / "cfnlint"
schemas_cfnlint_dir.mkdir(parents=True, exist_ok=True)

schemas_draft7_dir = schemas_dir / "draft7"
schemas_draft7_dir.mkdir(parents=True, exist_ok=True)

for resource_type, region in schemas.items():
rt_py = ToPy(resource_type)

with open(schemas_cfnlint_dir / f"{rt_py.py}.json", "w") as f:
f.write(
format_json_string(
PROVIDER_SCHEMA_MANAGER.get_resource_schema(
region, resource_type
).schema
)
)

_translator.translator(resource_type, region)

with open(schemas_draft7_dir / f"{rt_py.py}.json", "w") as f:
f.write(
format_json_string(
PROVIDER_SCHEMA_MANAGER.get_resource_schema(
region, resource_type
).schema
)
)

logger.info("Create schema package")
with tarfile.open(build_dir / "schemas-cfnlint.zip", "w:gz") as tar:
tar.add(schemas_cfnlint_dir, arcname="schemas")

with tarfile.open(build_dir / "schemas-draft7.zip", "w:gz") as tar:
tar.add(schemas_draft7_dir, arcname="schemas")
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
"Subnets": {
"minItems": 2
}
},
"requiredXor": [
"Subnets",
"SubnetMappings"
]
}
}
}
Loading