diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/__init__.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/__init__.py index 7b4bc9161a43..72c77310cf9e 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/__init__.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/__init__.py @@ -2,7 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore +__path__ = __import__("pkgutil").extend_path(__path__, __name__) from azure.ai.ml.dsl._pipeline_decorator import pipeline diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_component_func.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_component_func.py index 2922427eb527..085c935ce261 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_component_func.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_component_func.py @@ -4,7 +4,7 @@ # pylint: disable=protected-access -from typing import Callable, Mapping +from typing import Any, Callable, List, Mapping from azure.ai.ml.dsl._dynamic import KwParameter, create_kw_function_from_parameters from azure.ai.ml.entities import Component as ComponentEntity @@ -12,7 +12,7 @@ from azure.ai.ml.entities._component.datatransfer_component import DataTransferImportComponent -def get_dynamic_input_parameter(inputs: Mapping): +def get_dynamic_input_parameter(inputs: Mapping) -> List: """Return the dynamic parameter of the definition's input ports. :param inputs: The mapping of input names to input objects. @@ -31,7 +31,7 @@ def get_dynamic_input_parameter(inputs: Mapping): ] -def get_dynamic_source_parameter(source): +def get_dynamic_source_parameter(source: Any) -> List: """Return the dynamic parameter of the definition's source port. :param source: The source object. @@ -49,7 +49,7 @@ def get_dynamic_source_parameter(source): ] -def to_component_func(entity: ComponentEntity, component_creation_func) -> Callable[..., Command]: +def to_component_func(entity: ComponentEntity, component_creation_func: Callable) -> Callable[..., Command]: """Convert a ComponentEntity to a callable component function. :param entity: The ComponentEntity to convert. @@ -97,6 +97,7 @@ def to_component_func(entity: ComponentEntity, component_creation_func) -> Calla flattened_group_keys=flattened_group_keys, ) - dynamic_func._func_calling_example = example - dynamic_func._has_parameters = bool(all_params) + # Bug Item number: 2883188 + dynamic_func._func_calling_example = example # type: ignore + dynamic_func._has_parameters = bool(all_params) # type: ignore return dynamic_func diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_do_while.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_do_while.py index f45f59bf51cf..3ae598ee0363 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_do_while.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_do_while.py @@ -1,11 +1,18 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- +from typing import Dict, Optional, Union + +from azure.ai.ml.entities._builders import Command from azure.ai.ml.entities._builders.do_while import DoWhile +from azure.ai.ml.entities._builders.pipeline import Pipeline +from azure.ai.ml.entities._inputs_outputs import Output from azure.ai.ml.entities._job.pipeline._io import NodeOutput -def do_while(body, mapping, max_iteration_count: int, condition=None): +def do_while( + body: Union[Pipeline, Command], mapping: Dict, max_iteration_count: int, condition: Optional[Output] = None +) -> DoWhile: """Build a do_while node by specifying the loop body, output-input mapping, and termination condition. .. remarks:: @@ -63,7 +70,7 @@ def pipeline_with_do_while_node(): ) do_while_node.set_limits(max_iteration_count=max_iteration_count) - def _infer_and_update_body_input_from_mapping(): + def _infer_and_update_body_input_from_mapping() -> None: # pylint: disable=protected-access for source_output, body_input in mapping.items(): # handle case that mapping key is a NodeOutput diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_dynamic.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_dynamic.py index 0476ff8e3a4c..a58d76ba6b17 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_dynamic.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_dynamic.py @@ -4,7 +4,7 @@ import logging import types from inspect import Parameter, Signature -from typing import Callable, Dict, Sequence +from typing import Any, Callable, Dict, Sequence, cast from azure.ai.ml.entities import Component from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, UnexpectedKeywordError, ValidationException @@ -26,7 +26,9 @@ class KwParameter(Parameter): :type _optional: bool """ - def __init__(self, name, default, annotation=Parameter.empty, _type="str", _optional=False) -> None: + def __init__( + self, name: str, default: Any, annotation: Any = Parameter.empty, _type: str = "str", _optional: bool = False + ) -> None: super().__init__(name, Parameter.KEYWORD_ONLY, default=default, annotation=annotation) self._type = _type self._optional = _optional @@ -54,23 +56,25 @@ def _replace_function_name(func: types.FunctionType, new_name: str) -> types.Fun else: # Before python<3.8, replace is not available, we can only initialize the code as following. # https://github.com/python/cpython/blob/v3.7.8/Objects/codeobject.c#L97 - code = types.CodeType( + + # Bug Item number: 2881688 + code = types.CodeType( # type: ignore code_template.co_argcount, code_template.co_kwonlyargcount, code_template.co_nlocals, code_template.co_stacksize, code_template.co_flags, - code_template.co_code, - code_template.co_consts, + code_template.co_code, # type: ignore + code_template.co_consts, # type: ignore code_template.co_names, code_template.co_varnames, - code_template.co_filename, + code_template.co_filename, # type: ignore new_name, # Use the new name for the new code object. - code_template.co_firstlineno, - code_template.co_lnotab, + code_template.co_firstlineno, # type: ignore + code_template.co_lnotab, # type: ignore # The following two values are required for closures. - code_template.co_freevars, - code_template.co_cellvars, + code_template.co_freevars, # type: ignore + code_template.co_cellvars, # type: ignore ) # Initialize a new function with the code object and the new name, see the following ref for more details. # https://github.com/python/cpython/blob/4901fe274bc82b95dc89bcb3de8802a3dfedab32/Objects/clinic/funcobject.c.h#L30 @@ -89,7 +93,7 @@ def _replace_function_name(func: types.FunctionType, new_name: str) -> types.Fun # pylint: disable-next=docstring-missing-param -def _assert_arg_valid(kwargs: dict, keys: list, func_name: str): +def _assert_arg_valid(kwargs: dict, keys: list, func_name: str) -> None: """Assert the arg keys are all in keys.""" # pylint: disable=protected-access # validate component input names @@ -114,7 +118,7 @@ def _assert_arg_valid(kwargs: dict, keys: list, func_name: str): kwargs[lower2original_parameter_names[key.lower()]] = kwargs.pop(key) -def _update_dct_if_not_exist(dst: Dict, src: Dict): +def _update_dct_if_not_exist(dst: Dict, src: Dict) -> None: """Computes the union of `src` and `dst`, in-place within `dst` If a key exists in `dst` and `src` the value in `dst` is preserved @@ -162,7 +166,7 @@ def create_kw_function_from_parameters( ) default_kwargs = {p.name: p.default for p in parameters} - def f(**kwargs): + def f(**kwargs: Any) -> Any: # We need to make sure all keys of kwargs are valid. # Merge valid group keys with original keys. _assert_arg_valid(kwargs, [*list(default_kwargs.keys()), *flattened_group_keys], func_name=func_name) @@ -170,9 +174,10 @@ def f(**kwargs): _update_dct_if_not_exist(kwargs, default_kwargs) return func(**kwargs) - f = _replace_function_name(f, func_name) + f = _replace_function_name(cast(types.FunctionType, f), func_name) # Set the signature so jupyter notebook could have param hint by calling inspect.signature() - f.__signature__ = Signature(parameters) + # Bug Item number: 2883223 + f.__signature__ = Signature(parameters) # type: ignore # Set doc/name/module to make sure help(f) shows following expected result. # Expected help(f): # @@ -183,5 +188,5 @@ def f(**kwargs): f.__doc__ = documentation # Set documentation to update FUNC_DOC in help. # Set module = None to avoid showing the sentence `in module 'azure.ai.ml.component._dynamic' in help.` # See https://github.com/python/cpython/blob/2145c8c9724287a310bc77a2760d4f1c0ca9eb0c/Lib/pydoc.py#L1757 - f.__module__ = None + f.__module__ = None # type: ignore return f diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_fl_scatter_gather_node.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_fl_scatter_gather_node.py index 55985cdebf1d..b66412102b89 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_fl_scatter_gather_node.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_fl_scatter_gather_node.py @@ -5,7 +5,7 @@ import importlib # pylint: disable=protected-access -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from azure.ai.ml._utils._experimental import experimental from azure.ai.ml.entities import CommandComponent, PipelineJob @@ -13,7 +13,7 @@ from azure.ai.ml.entities._builders.fl_scatter_gather import FLScatterGather -def _check_for_import(package_name): +def _check_for_import(package_name: str) -> None: try: # pylint: disable=unused-import importlib.import_module(package_name) @@ -31,16 +31,16 @@ def fl_scatter_gather( silo_configs: List[FederatedLearningSilo], silo_component: Union[PipelineJob, CommandComponent], aggregation_component: Union[PipelineJob, CommandComponent], - aggregation_compute: str = None, - aggregation_datastore: str = None, + aggregation_compute: Optional[str] = None, + aggregation_datastore: Optional[str] = None, shared_silo_kwargs: Optional[Dict] = None, aggregation_kwargs: Optional[Dict] = None, silo_to_aggregation_argument_map: Optional[Dict] = None, aggregation_to_silo_argument_map: Optional[Dict] = None, max_iterations: int = 1, _create_default_mappings_if_needed: bool = False, - **kwargs, -): + **kwargs: Any, +) -> FLScatterGather: """A federated learning scatter-gather subgraph node. It's assumed that this will be used inside of a `@pipeline`-decorated function in order to create a subgraph which diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_group_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_group_decorator.py index fff8f6fbec03..16d1adbc1cc0 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_group_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_group_decorator.py @@ -7,7 +7,7 @@ # Attribute on customized group class to mark a value type as a group of inputs/outputs. import _thread import functools -from typing import Any, Callable, Dict, List, Type, TypeVar, Union +from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union from azure.ai.ml import Input, Output from azure.ai.ml.constants._component import IOConstants @@ -145,12 +145,12 @@ def parent_func(params: ParentClass): def _create_fn( name: str, - args: List[str], - body: List[str], + args: Union[List, str], + body: Union[List, str], *, - globals: Dict[str, Any] = None, - locals: Dict[str, Any] = None, - return_type: Type[T2], + globals: Optional[Dict[str, Any]] = None, + locals: Optional[Dict[str, Any]] = None, + return_type: Optional[Type[T2]], ) -> Callable[..., T2]: """To generate function in class. @@ -188,9 +188,10 @@ def _create_fn( txt = f" def {name}({args}){return_annotation}:\n{body}" local_vars = ", ".join(locals.keys()) txt = f"def __create_fn__({local_vars}):\n{txt}\n return {name}" - ns = {} + ns: Dict = {} exec(txt, globals, ns) # pylint: disable=exec-used # nosec - return ns["__create_fn__"](**locals) + res: Callable = ns["__create_fn__"](**locals) + return res def _create_init_fn( # pylint: disable=unused-argument cls: Type[T], fields: Dict[str, Union[Annotation, Input, Output]] @@ -207,7 +208,7 @@ def _create_init_fn( # pylint: disable=unused-argument # Reference code link: # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L523 - def _get_data_type_from_annotation(anno: Input): + def _get_data_type_from_annotation(anno: Any) -> Any: if isinstance(anno, GroupInput): return anno._group_class # keep original annotation for Outputs @@ -220,7 +221,7 @@ def _get_data_type_from_annotation(anno: Input): # otherwise, keep original annotation return anno - def _get_default(key): + def _get_default(key: str) -> Any: # will set None as default value when default not exist so won't need to reorder the init params val = fields[key] if hasattr(val, "default"): @@ -254,20 +255,22 @@ def _create_repr_fn(fields: Dict[str, Union[Annotation, Input, Output]]) -> Call # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L582 fn = _create_fn( "__repr__", - ("self",), + [ + "self", + ], ['return self.__class__.__qualname__ + f"(' + ", ".join([f"{k}={{self.{k}!r}}" for k in fields]) + ')"'], return_type=str, ) # This function's logic is copied from "recursive_repr" function in # reprlib module to avoid dependency. - def _recursive_repr(user_function): + def _recursive_repr(user_function: Any) -> Any: # Decorator to make a repr function return "..." for a recursive # call. repr_running = set() @functools.wraps(user_function) - def wrapper(self): + def wrapper(self: Any) -> Any: key = id(self), _thread.get_ident() if key in repr_running: return "..." @@ -280,7 +283,8 @@ def wrapper(self): return wrapper - return _recursive_repr(fn) + res: Callable = _recursive_repr(fn) + return res def _process_class(cls: Type[T], all_fields: Dict[str, Union[Annotation, Input, Output]]) -> Type[T]: setattr(cls, "__init__", _create_init_fn(cls, all_fields)) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_load_import.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_load_import.py index 23dcd9b8c27a..6793c96ab9b6 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_load_import.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_load_import.py @@ -4,7 +4,7 @@ # pylint: disable=protected-access -from typing import Callable +from typing import Any, Callable from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY from azure.ai.ml.entities._builders import Command @@ -12,7 +12,7 @@ # pylint: disable=unused-argument -def to_component(*, job: ComponentTranslatableMixin, **kwargs) -> Callable[..., Command]: +def to_component(*, job: ComponentTranslatableMixin, **kwargs: Any) -> Callable[..., Command]: """Translate a job object to a component function, provided job should be able to translate to a component. For example: @@ -41,4 +41,5 @@ def to_component(*, job: ComponentTranslatableMixin, **kwargs) -> Callable[..., # set default base path as "./". Because if code path is relative path and base path is None, will raise error when # get arm id of Code - return job._to_component(context={BASE_PATH_CONTEXT_KEY: Path("./")}) + res: Callable = job._to_component(context={BASE_PATH_CONTEXT_KEY: Path("./")}) + return res diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_mldesigner/__init__.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_mldesigner/__init__.py index 7e3eedb09e9a..2732cd75e8a5 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_mldesigner/__init__.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_mldesigner/__init__.py @@ -8,19 +8,19 @@ original function/module names the same as before, otherwise mldesigner will be broken by this change. """ -__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore +__path__ = __import__("pkgutil").extend_path(__path__, __name__) -from azure.ai.ml.entities._component.component_factory import component_factory -from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function -from azure.ai.ml.entities._inputs_outputs import _get_param_with_standard_annotation +from azure.ai.ml._internal.entities import InternalComponent from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes from azure.ai.ml._utils._asset_utils import get_ignore_file from azure.ai.ml._utils.utils import try_enable_internal_components -from azure.ai.ml._internal.entities import InternalComponent from azure.ai.ml.dsl._condition import condition from azure.ai.ml.dsl._do_while import do_while -from azure.ai.ml.dsl._parallel_for import parallel_for, ParallelFor from azure.ai.ml.dsl._group_decorator import group +from azure.ai.ml.dsl._parallel_for import ParallelFor, parallel_for +from azure.ai.ml.entities._component.component_factory import component_factory +from azure.ai.ml.entities._inputs_outputs import _get_param_with_standard_annotation +from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function from ._constants import V1_COMPONENT_TO_NODE diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_overrides_definition.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_overrides_definition.py index 54a74319cbd0..6460ec6f7f56 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_overrides_definition.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_overrides_definition.py @@ -2,7 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -from typing import Mapping +from typing import Mapping, Optional class OverrideDefinition(dict): @@ -11,7 +11,7 @@ class OverrideDefinition(dict): def get_override_definition_from_schema( schema: str, # pylint: disable=unused-argument -) -> Mapping[str, OverrideDefinition]: +) -> Optional[Mapping[str, OverrideDefinition]]: """Ger override definition from a json schema. :param schema: Json schema of component job. diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_parallel_for.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_parallel_for.py index 030d0a70f292..5b4bc8a41c2b 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_parallel_for.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_parallel_for.py @@ -1,10 +1,16 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- +from typing import Any, Dict, List, Union + +from azure.ai.ml.entities._builders import BaseNode from azure.ai.ml.entities._builders.parallel_for import ParallelFor +from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput -def parallel_for(*, body, items, **kwargs): +def parallel_for( + *, body: BaseNode, items: Union[List, Dict, str, PipelineInput, NodeOutput], **kwargs: Any +) -> ParallelFor: """Build a parallel for loop by specifying the loop body and input items. .. remarks:: diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py index 9cbd975fa095..90d7f36f23ea 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py @@ -9,7 +9,7 @@ import typing from collections import OrderedDict from inspect import Parameter, signature -from typing import Any, Callable, Dict, Union +from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union from azure.ai.ml._utils._func_utils import get_outputs_and_locals from azure.ai.ml._utils.utils import is_valid_node_name, parse_args_description_from_docstring @@ -19,6 +19,7 @@ from azure.ai.ml.entities import PipelineJob from azure.ai.ml.entities._builders import BaseNode from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode +from azure.ai.ml.entities._component.component import Component from azure.ai.ml.entities._component.pipeline_component import PipelineComponent from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output, _get_param_with_standard_annotation from azure.ai.ml.entities._inputs_outputs.utils import _get_annotation_by_value, is_group @@ -37,20 +38,20 @@ class _PipelineComponentBuilderStack: - def __init__(self): - self.items = [] + def __init__(self) -> None: + self.items: List["PipelineComponentBuilder"] = [] - def top(self) -> "PipelineComponentBuilder": + def top(self) -> Optional["PipelineComponentBuilder"]: if self.is_empty(): return None return self.items[-1] - def pop(self) -> "PipelineComponentBuilder": + def pop(self) -> Optional["PipelineComponentBuilder"]: if self.is_empty(): return None return self.items.pop() - def push(self, item): + def push(self, item: object) -> None: error_msg = f"{self.__class__.__name__} only " f"allows pushing `{PipelineComponentBuilder.__name__}` element" assert isinstance(item, PipelineComponentBuilder), error_msg @@ -66,10 +67,10 @@ def push(self, item): no_personal_data_message=msg.format("[current_pipeline]", _BUILDER_STACK_MAX_DEPTH), ) - def is_empty(self): + def is_empty(self) -> bool: return len(self.items) == 0 - def size(self): + def size(self) -> int: return len(self.items) @@ -86,10 +87,11 @@ def _is_inside_dsl_pipeline_func() -> bool: return _definition_builder_stack.size() > 0 -def _add_component_to_current_definition_builder(component): +def _add_component_to_current_definition_builder(component: Union[BaseNode, AutoMLJob]) -> None: if _is_inside_dsl_pipeline_func(): builder = _definition_builder_stack.top() - builder.add_node(component) + if builder is not None: + builder.add_node(component) class PipelineComponentBuilder: @@ -106,24 +108,22 @@ class PipelineComponentBuilder: def __init__( self, func: Callable, - name=None, - version=None, - display_name=None, - description=None, - default_datastore=None, - tags=None, - source_path=None, - non_pipeline_inputs=None, + name: Optional[str] = None, + version: Optional[str] = None, + display_name: Optional[str] = None, + description: Optional[str] = None, + default_datastore: Any = None, + tags: Optional[Union[Dict[str, str], str]] = None, + source_path: Optional[str] = None, + non_pipeline_inputs: Optional[List] = None, ): self.func = func - name = name if name else func.__name__ + name = name if name is not None else func.__name__ display_name = display_name if display_name else name description = description if description else func.__doc__ self._args_description = parse_args_description_from_docstring(func.__doc__) - if name is None: - name = func.__name__ # List of nodes, order by it's creation order in pipeline. - self.nodes = [] + self.nodes: List = [] self.non_pipeline_parameter_names = non_pipeline_inputs or [] # A dict of inputs name to InputDefinition. # TODO: infer pipeline component input meta from assignment @@ -146,7 +146,7 @@ def name(self) -> str: """ return self._name - def add_node(self, node: Union[BaseNode, AutoMLJob]): + def add_node(self, node: Union[BaseNode, AutoMLJob]) -> None: """Add node to pipeline builder. :param node: A pipeline node. @@ -155,7 +155,11 @@ def add_node(self, node: Union[BaseNode, AutoMLJob]): self.nodes.append(node) def build( - self, *, user_provided_kwargs=None, non_pipeline_inputs_dict=None, non_pipeline_inputs=None + self, + *, + user_provided_kwargs: Optional[Dict] = None, + non_pipeline_inputs_dict: Optional[Dict] = None, + non_pipeline_inputs: Optional[List] = None, ) -> PipelineComponent: """Build a pipeline component from current pipeline builder. @@ -208,7 +212,7 @@ def build( pipeline_component._outputs = self._build_pipeline_outputs(outputs) return pipeline_component - def _validate_group_annotation(self, name: str, val: GroupInput): + def _validate_group_annotation(self, name: str, val: GroupInput) -> None: for k, v in val.values.items(): if isinstance(v, GroupInput): self._validate_group_annotation(k, v) @@ -222,8 +226,10 @@ def _validate_group_annotation(self, name: str, val: GroupInput): else: raise UserErrorException(f"Unsupported annotation type {type(v)} for group field {name}.{k}") - def _build_inputs(self, func): - inputs = _get_param_with_standard_annotation(func, is_func=True, skip_params=self.non_pipeline_parameter_names) + def _build_inputs(self, func: Union[Callable, Type]) -> Dict: + inputs: Dict = _get_param_with_standard_annotation( + func, is_func=True, skip_params=self.non_pipeline_parameter_names + ) for k, v in inputs.items(): if isinstance(v, GroupInput): self._validate_group_annotation(name=k, val=v) @@ -269,8 +275,8 @@ def _map_internal_output_type(_meta: Output) -> str: :rtype: str """ if type(_meta).__name__ != "InternalOutput": - return _meta.type - return _meta.map_pipeline_output_type() + return str(_meta.type) + return str(_meta.map_pipeline_output_type()) # Note: Here we set PipelineOutput as Pipeline's output definition as we need output binding. output_meta = Output(type=_map_internal_output_type(meta), description=meta.description, mode=meta.mode) @@ -296,7 +302,7 @@ def _map_internal_output_type(_meta: Output) -> str: self._validate_inferred_outputs(output_meta_dict, output_dict) return output_dict - def _get_group_parameter_defaults(self): + def _get_group_parameter_defaults(self) -> Dict: group_defaults = {} for key, val in self.inputs.items(): if not isinstance(val, GroupInput): @@ -336,7 +342,7 @@ def _update_nodes_variable_names(self, func_variables: dict) -> Dict[str, Union[ :rtype: Dict[str, Union[BaseNode, AutoMLJob]] """ - def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]): + def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]) -> Optional[Union[str, Component]]: # TODO(1979547): refactor this if isinstance(node, AutoMLJob): return node.name or _sanitize_python_variable_name(node.__class__.__name__) @@ -398,7 +404,7 @@ def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]): name_count_dict[target_name] = 0 name_count_dict[target_name] += 1 suffix = "" if name_count_dict[target_name] == 1 else f"_{name_count_dict[target_name] - 1}" - id_name_dict[_id] = f"{_sanitize_python_variable_name(target_name)}{suffix}" + id_name_dict[_id] = f"{_sanitize_python_variable_name(str(target_name))}{suffix}" final_name = id_name_dict[_id] node.name = final_name result[final_name] = node @@ -407,7 +413,7 @@ def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]): self._validate_keyword_in_node_io(node) return result - def _update_inputs(self, pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]): + def _update_inputs(self, pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]) -> None: """Update the pipeline inputs by the dict. :param pipeline_inputs: The pipeline inputs @@ -461,7 +467,7 @@ def _get_output_annotation(cls, func: Callable) -> Dict[str, Dict]: output_annotations[key] = val._to_dict() return output_annotations - def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: Dict[str, PipelineOutput]): + def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: Dict[str, PipelineOutput]) -> None: """Validate inferred output dict against annotation. :param output_meta_dict: The output meta dict @@ -504,7 +510,7 @@ def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: Dict[s raise UserErrorException(f"{error_prefix}: {unmatched_outputs}") @staticmethod - def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]): + def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]) -> None: if has_attr_safe(node, "inputs"): for input_name in set(node.inputs) & COMPONENT_IO_KEYWORDS: module_logger.warning( @@ -527,7 +533,13 @@ def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]): ) -def _build_pipeline_parameter(func, *, user_provided_kwargs, group_default_kwargs=None, non_pipeline_inputs=None): +def _build_pipeline_parameter( + func: Optional[Callable], + *, + user_provided_kwargs: Dict, + group_default_kwargs: Optional[Dict] = None, + non_pipeline_inputs: Optional[List] = None, +) -> Dict: # Pass group defaults into kwargs to support group.item can be used even if no default on function. # example: # @group @@ -549,7 +561,7 @@ def _build_pipeline_parameter(func, *, user_provided_kwargs, group_default_kwarg } ) - def all_params(parameters): + def all_params(parameters: Any) -> Generator: for value in parameters.values(): yield value @@ -576,7 +588,9 @@ def all_params(parameters): return transformed_kwargs -def _wrap_pipeline_parameter(key, default_value, actual_value, group_names=None): +def _wrap_pipeline_parameter( + key: str, default_value: Any, actual_value: Any, group_names: Optional[List[str]] = None +) -> Union[_GroupAttrDict, PipelineInput]: # Append parameter path in group group_names = [*group_names] if group_names else [] if isinstance(default_value, _GroupAttrDict): diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index c4e881468504..03d87598b8c8 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -10,7 +10,7 @@ from functools import wraps from inspect import Parameter, signature from pathlib import Path -from typing import Callable, Dict, List, Optional, TypeVar, Union, overload +from typing import Any, Callable, Dict, List, Optional, TypeVar, Union, overload from typing_extensions import ParamSpec @@ -57,7 +57,7 @@ # Overload the returns a decorator when func is None @overload def pipeline( - func: None = None, + func: None, *, name: Optional[str] = None, version: Optional[str] = None, @@ -65,7 +65,7 @@ def pipeline( description: Optional[str] = None, experiment_name: Optional[str] = None, tags: Optional[Dict[str, str]] = None, - **kwargs, + **kwargs: Any, ) -> Callable[[Callable[P, T]], Callable[P, PipelineJob]]: ... @@ -73,7 +73,7 @@ def pipeline( # Overload the returns a decorated function when func isn't None @overload def pipeline( - func: Callable[P, T] = None, + func: Callable[P, T], *, name: Optional[str] = None, version: Optional[str] = None, @@ -81,7 +81,7 @@ def pipeline( description: Optional[str] = None, experiment_name: Optional[str] = None, tags: Optional[Dict[str, str]] = None, - **kwargs, + **kwargs: Any, ) -> Callable[P, PipelineJob]: ... @@ -94,8 +94,8 @@ def pipeline( display_name: Optional[str] = None, description: Optional[str] = None, experiment_name: Optional[str] = None, - tags: Optional[Dict[str, str]] = None, - **kwargs, + tags: Optional[Union[Dict[str, str], str]] = None, + **kwargs: Any, ) -> Union[Callable[[Callable[P, T]], Callable[P, PipelineJob]], Callable[P, PipelineJob]]: """Build a pipeline which contains all component nodes defined in this function. @@ -135,7 +135,8 @@ def pipeline( """ def pipeline_decorator(func: Callable[P, T]) -> Callable[P, PipelineJob]: - if not isinstance(func, Callable): # pylint: disable=isinstance-second-argument-not-valid-type + # pylint: disable=isinstance-second-argument-not-valid-type + if not isinstance(func, Callable): # type: ignore raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") non_pipeline_inputs = kwargs.get("non_pipeline_inputs", []) or kwargs.get("non_pipeline_parameters", []) @@ -244,9 +245,10 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> PipelineJob: return built_pipeline - wrapper._is_dsl_func = True - wrapper._job_settings = job_settings - wrapper._pipeline_builder = pipeline_builder + # Bug Item number: 2883169 + wrapper._is_dsl_func = True # type: ignore + wrapper._job_settings = job_settings # type: ignore + wrapper._pipeline_builder = pipeline_builder # type: ignore return wrapper # enable use decorator without "()" if all arguments are default values @@ -256,7 +258,7 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> PipelineJob: # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype -def _validate_args(func, args, kwargs, non_pipeline_inputs): +def _validate_args(func: Callable, args: Any, kwargs: Dict, non_pipeline_inputs: List) -> OrderedDict: """Validate customer function args and convert them to kwargs.""" if not isinstance(non_pipeline_inputs, List) or any(not isinstance(param, str) for param in non_pipeline_inputs): msg = "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string" @@ -297,7 +299,7 @@ def _validate_args(func, args, kwargs, non_pipeline_inputs): raise MultipleValueError(func.__name__, _k) provided_args[_k] = _v - def _is_supported_data_type(_data): + def _is_supported_data_type(_data: object) -> bool: return isinstance(_data, SUPPORTED_INPUT_TYPES) or is_group(_data) for pipeline_input_name in provided_args: diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_settings.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_settings.py index de8060a24467..8a6d62266125 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_settings.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_settings.py @@ -3,7 +3,7 @@ # --------------------------------------------------------- import logging from collections import deque -from typing import Dict, Optional, Union +from typing import Deque, Dict, Optional, Union from azure.ai.ml.entities._builders import BaseNode from azure.ai.ml.exceptions import UserErrorException @@ -12,10 +12,10 @@ class _DSLSettingsStack: - def __init__(self): - self._stack = deque() + def __init__(self) -> None: + self._stack: Deque["_DSLSettings"] = deque() - def push(self): + def push(self) -> None: self._stack.append(_DSLSettings()) def pop(self) -> "_DSLSettings": @@ -47,16 +47,16 @@ class _DSLSettings: Store settings from `dsl.set_pipeline_settings` during pipeline definition. """ - def __init__(self): - self._init_job = None - self._finalize_job = None + def __init__(self) -> None: + self._init_job: Optional[Union[BaseNode, str]] = None + self._finalize_job: Optional[Union[BaseNode, str]] = None @property def init_job(self) -> BaseNode: return self._init_job @init_job.setter - def init_job(self, value: Union[BaseNode, str]): + def init_job(self, value: Optional[Union[BaseNode, str]]) -> None: # pylint: disable=logging-fstring-interpolation if isinstance(value, (BaseNode, str)): self._init_job = value @@ -68,7 +68,7 @@ def init_job_set(self) -> bool: # note: need to use `BaseNode is not None` as `bool(BaseNode)` will return False return self._init_job is not None - def init_job_name(self, jobs: Dict[str, BaseNode]) -> str: + def init_job_name(self, jobs: Dict[str, BaseNode]) -> Optional[str]: if isinstance(self._init_job, str): return self._init_job for name, job in jobs.items(): @@ -82,7 +82,7 @@ def finalize_job(self) -> BaseNode: return self._finalize_job @finalize_job.setter - def finalize_job(self, value: Union[BaseNode, str]): + def finalize_job(self, value: Optional[Union[BaseNode, str]]) -> None: # pylint: disable=logging-fstring-interpolation if isinstance(value, (BaseNode, str)): self._finalize_job = value diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_utils.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_utils.py index 0db57f19e3ae..ad7b183d34d6 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_utils.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_utils.py @@ -10,13 +10,13 @@ import sys import types from pathlib import Path -from typing import Iterable, Optional, Union +from typing import Generator, Optional, Union from azure.ai.ml.dsl._constants import VALID_NAME_CHARS from azure.ai.ml.exceptions import ComponentException, ErrorCategory, ErrorTarget -def _normalize_identifier_name(name): +def _normalize_identifier_name(name: str) -> str: normalized_name = name.lower() normalized_name = re.sub(r"[\W_]", " ", normalized_name) # No non-word characters normalized_name = re.sub(" +", " ", normalized_name).strip() # No double spaces, leading or trailing spaces @@ -25,7 +25,7 @@ def _normalize_identifier_name(name): return normalized_name -def _sanitize_python_variable_name(name: str): +def _sanitize_python_variable_name(name: str) -> str: return _normalize_identifier_name(name).replace(" ", "_") @@ -68,7 +68,7 @@ def _resolve_source_file() -> Optional[Path]: pattern, last_frame.frame ): module = inspect.getmodule(last_frame.frame) - return Path(module.__file__).absolute() if module else None + return Path(str(module.__file__)).absolute() if module else None except Exception: # pylint: disable=broad-except pass return None @@ -123,7 +123,7 @@ def _relative_to( @contextlib.contextmanager -def inject_sys_path(path): +def inject_sys_path(path: object) -> Generator: original_sys_path = sys.path.copy() sys.path.insert(0, str(path)) try: @@ -132,19 +132,19 @@ def inject_sys_path(path): sys.path = original_sys_path -def _force_reload_module(module): +def _force_reload_module(module: types.ModuleType) -> types.ModuleType: # Reload the module except the case that module.__spec__ is None. # In the case module.__spec__ is None (E.g. module is __main__), reload will raise exception. if module.__spec__ is None: return module - path = Path(module.__spec__.loader.path).parent + path = Path(module.__spec__.loader.path).parent # type: ignore with inject_sys_path(path): return importlib.reload(module) @contextlib.contextmanager # pylint: disable-next=docstring-missing-return,docstring-missing-rtype -def _change_working_dir(path: Union[str, os.PathLike], mkdir: bool = True) -> Iterable[None]: +def _change_working_dir(path: Union[str, os.PathLike], mkdir: bool = True) -> Generator: """Context manager for changing the current working directory. :param path: The path to change to @@ -163,7 +163,9 @@ def _change_working_dir(path: Union[str, os.PathLike], mkdir: bool = True) -> It os.chdir(saved_path) -def _import_component_with_working_dir(module_name, working_dir=None, force_reload=False): +def _import_component_with_working_dir( + module_name: str, working_dir: Optional[str] = None, force_reload: bool = False +) -> types.ModuleType: if working_dir is None: working_dir = os.getcwd() working_dir = str(Path(working_dir).resolve().absolute()) @@ -188,7 +190,7 @@ def _import_component_with_working_dir(module_name, working_dir=None, force_relo error=e, error_category=ErrorCategory.USER_ERROR, ) from e - loaded_module_file = Path(py_module.__file__).resolve().absolute().as_posix() + loaded_module_file = Path(str(py_module.__file__)).resolve().absolute().as_posix() posix_working_dir = Path(working_dir).absolute().as_posix() if _relative_to(loaded_module_file, posix_working_dir) is None: if force_reload: @@ -204,7 +206,7 @@ def _import_component_with_working_dir(module_name, working_dir=None, force_relo @contextlib.contextmanager -def environment_variable_overwrite(key, val): +def environment_variable_overwrite(key: str, val: str) -> Generator: if key in os.environ: backup_value = os.environ[key] else: diff --git a/sdk/ml/azure-ai-ml/pyproject.toml b/sdk/ml/azure-ai-ml/pyproject.toml index 6fdb8b7a5cd2..0a41d493bf95 100644 --- a/sdk/ml/azure-ai-ml/pyproject.toml +++ b/sdk/ml/azure-ai-ml/pyproject.toml @@ -45,7 +45,6 @@ exclude = [ "azure/ai/ml/_utils", "azure/ai/ml/constants/", "azure/ai/ml/data_transfer/", - "azure/ai/ml/dsl", "azure/ai/ml/entities/", "azure/ai/ml/identity/", "azure/ai/ml/operations/",