Skip to content

Commit

Permalink
update for new dpg model (Azure#34849)
Browse files Browse the repository at this point in the history
  • Loading branch information
msyyc authored Apr 2, 2024
1 parent e65220d commit 2739e0d
Showing 1 changed file with 65 additions and 14 deletions.
79 changes: 65 additions & 14 deletions tools/azure-sdk-tools/packaging_tools/code_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import types
import tempfile
import re
from typing import Dict, Any, Optional, List
from typing import Dict, Any, Optional, List, ForwardRef

# Because I'm subprocessing myself, I need to do weird thing as import.
try:
Expand Down Expand Up @@ -41,6 +41,10 @@ def create_empty_report():
return {"client": {}, "models": {"enums": {}, "exceptions": {}, "models": {}}, "operations": {}}


def is_model(model_cls: object, is_new_model: bool) -> bool:
return hasattr(model_cls, "_is_model" if is_new_model else "_attribute_map")


def create_report(module_name: str) -> Dict[str, Any]:
module_to_generate = importlib.import_module(module_name)
client_name = getattr(module_to_generate, "__all__")
Expand All @@ -54,14 +58,15 @@ def create_report(module_name: str) -> Dict[str, Any]:

# Look for models first
model_names = [model_name for model_name in dir(module_to_generate.models) if model_name[0].isupper()]
is_new_model = hasattr(module_to_generate, "_model_base")
for model_name in model_names:
model_cls = getattr(module_to_generate.models, model_name)
if hasattr(model_cls, "_attribute_map"):
report["models"]["models"][model_name] = create_model_report(model_cls)
if is_model(model_cls, is_new_model):
report["models"]["models"][model_name] = create_model_report(model_cls, is_new_model)
elif issubclass(model_cls, Exception): # If not, might be an exception
report["models"]["exceptions"][model_name] = create_model_report(model_cls)
report["models"]["exceptions"][model_name] = create_model_report(model_cls, is_new_model)
else:
report["models"]["enums"][model_name] = create_model_report(model_cls)
report["models"]["enums"][model_name] = create_model_report(model_cls, is_new_model)
# Look for operation groups
try:
operations_classes = [op_name for op_name in dir(module_to_generate.operations) if op_name[0].isupper()]
Expand All @@ -83,22 +88,69 @@ def create_report(module_name: str) -> Dict[str, Any]:
return report


def create_model_report(model_cls):
def get_attr_map(model_cls: object, is_new_model: bool) -> Dict[str, Any]:
if is_new_model:
return getattr(model_cls(), "_attr_to_rest_field")
return getattr(model_cls, "_attribute_map")


def get_type_annotation(model_cls: object, attribute: str) -> List[str]:
# make sure to get the annotations from the base class
mros = model_cls.__mro__[:-3][::-1]
annotations = {
k: v
for mro_class in mros
if hasattr(mro_class, "__annotations__") # pylint: disable=no-member
for k, v in mro_class.__annotations__.items() # pylint: disable=no-member
}
attr_type = annotations.get(attribute)
type_list = getattr(attr_type, "__args__", [attr_type])
return sorted(
[
item.__forward_arg__.replace("_models.", "")
if isinstance(item, ForwardRef)
else getattr(item, "__name__", str(item))
for item in type_list
]
)


def get_type(model_cls: object, attribute: str, conf: Dict[str, Any], is_new_model: bool) -> str:
if is_new_model:
return " or ".join(filter(lambda x: x != "NoneType", get_type_annotation(model_cls, attribute)))
return conf["type"]


def _get_validation(model_cls: object, attribute: str) -> Dict[str, Any]:
return getattr(model_cls, "_validation", {}).get(attribute, {})


def is_required(model_cls: object, attribute: str, is_new_model: bool) -> bool:
if is_new_model:
return "NoneType" not in get_type_annotation(model_cls, attribute)
return _get_validation(model_cls, attribute).get("required", False)


def is_readonly(model_cls: object, attribute: str, is_new_model: bool) -> bool:
if is_new_model:
return getattr(getattr(model_cls, "_attr_to_rest_field").get(attribute), "_visibility") == ["read"]
return _get_validation(model_cls, attribute).get("readonly", False)


def create_model_report(model_cls: object, is_new_model: bool):
result = {
"name": model_cls.__name__,
}
# If _attribute_map, it's a model
if hasattr(model_cls, "_attribute_map"):
if is_model(model_cls, is_new_model):
result["type"] = "Model"
for attribute, conf in model_cls._attribute_map.items():
attribute_validation = getattr(model_cls, "_validation", {}).get(attribute, {})

for attribute, conf in get_attr_map(model_cls, is_new_model).items():
result.setdefault("parameters", {})[attribute] = {
"name": attribute,
"properties": {
"type": conf["type"],
"required": attribute_validation.get("required", False),
"readonly": attribute_validation.get("readonly", False),
"type": get_type(model_cls, attribute, conf, is_new_model),
"required": is_required(model_cls, attribute, is_new_model),
"readonly": is_readonly(model_cls, attribute, is_new_model),
},
}
elif issubclass(model_cls, Exception): # If not, might be an exception
Expand Down Expand Up @@ -190,7 +242,6 @@ def main(
metadata_path: Optional[str] = None,
last_pypi_stable: bool = False,
):

output_msg = output if output else "default folder"
_LOGGER.info(
f"Building code report of {input_parameter} for version {version} in {output_msg} ({no_venv}/{pypi}/{last_pypi})"
Expand Down

0 comments on commit 2739e0d

Please sign in to comment.