diff --git a/flytekit/__init__.py b/flytekit/__init__.py index c934a6fa79..968e3153eb 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -217,6 +217,7 @@ from importlib.metadata import entry_points from flytekit._version import __version__ +from flytekit.configuration import Config from flytekit.core.array_node_map_task import map_task from flytekit.core.artifact import Artifact from flytekit.core.base_sql_task import SQLTask @@ -250,8 +251,11 @@ from flytekit.models.documentation import Description, Documentation, SourceCode from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar from flytekit.models.types import LiteralType +from flytekit.remote import FlyteRemote from flytekit.sensor.sensor_engine import SensorEngine from flytekit.types import directory, file, iterator +from flytekit.types.directory import FlyteDirectory +from flytekit.types.file import FlyteFile from flytekit.types.structured.structured_dataset import ( StructuredDataset, StructuredDatasetFormat, diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index b2c93d64d2..57c7168f36 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -18,7 +18,7 @@ from rich.progress import Progress, TextColumn, TimeElapsedColumn from typing_extensions import get_origin -from flytekit import Annotations, FlyteContext, FlyteContextManager, Labels, Literal, WorkflowExecutionPhase +from flytekit import Annotations, FlyteContext, FlyteContextManager, Labels, LaunchPlan, Literal, WorkflowExecutionPhase from flytekit.clis.sdk_in_container.helpers import ( parse_copy, patch_image_config, @@ -369,13 +369,25 @@ class Entities(typing.NamedTuple): workflows: typing.List[str] tasks: typing.List[str] + launch_plans: typing.List[typing.Tuple[str, str]] # LP is stored as a tuple of name, the variable name in the file def all(self) -> typing.List[str]: e = [] e.extend(self.workflows) e.extend(self.tasks) + for i in self.launch_plans: + e.append(i[0]) return e + def matching_lp(self, lp_name: str) -> typing.Optional[str]: + """ + Returns the variable name of the launch plan in the file + """ + for i in self.launch_plans: + if i[0] == lp_name: + return i[1] + return None + def get_entities_in_file(filename: pathlib.Path, should_delete: bool) -> Entities: """ @@ -393,6 +405,7 @@ def get_entities_in_file(filename: pathlib.Path, should_delete: bool) -> Entitie workflows = [] tasks = [] + launch_plans = [] module = importlib.import_module(module_name) for name in dir(module): o = module.__dict__[name] @@ -400,10 +413,17 @@ def get_entities_in_file(filename: pathlib.Path, should_delete: bool) -> Entitie workflows.append(name) elif isinstance(o, PythonTask): tasks.append(name) + elif isinstance(o, LaunchPlan): + varname = name + if o.name: + # name refers to the variable name, while o.name refers to the launch plan name if the user has + # specified one + name = o.name + launch_plans.append((varname, name)) if should_delete and os.path.exists(filename): os.remove(filename) - return Entities(workflows, tasks) + return Entities(workflows, tasks, launch_plans) def to_click_option( @@ -592,7 +612,7 @@ def is_optional(_type): return typing.get_origin(_type) is typing.Union and type(None) in typing.get_args(_type) -def run_command(ctx: click.Context, entity: typing.Union[PythonFunctionWorkflow, PythonTask]): +def run_command(ctx: click.Context, entity: typing.Union[PythonFunctionWorkflow, PythonTask, LaunchPlan]): """ Returns a function that is used to implement WorkflowCommand and execute a flyte workflow. """ @@ -604,7 +624,11 @@ def _run(*args, **kwargs): # By the time we get to this function, all the loading has already happened run_level_params: RunLevelParams = ctx.obj - entity_type = "workflow" if isinstance(entity, PythonFunctionWorkflow) else "task" + entity_type = "workflow" + if isinstance(entity, LaunchPlan): + entity_type = "launch plan" + elif isinstance(entity, PythonTask): + entity_type = "task" logger.debug(f"Running {entity_type} {entity.name} with input {kwargs}") click.secho( @@ -981,7 +1005,7 @@ def __init__(self, filename: str, *args, **kwargs): else: self._filename = pathlib.Path(filename).resolve() self._should_delete = False - self._entities = None + self._entities: typing.Optional[Entities] = None def list_commands(self, ctx): if self._entities: @@ -995,8 +1019,8 @@ def _create_command( ctx: click.Context, entity_name: str, run_level_params: RunLevelParams, - loaded_entity: [PythonTask, WorkflowBase], - is_workflow: bool, + loaded_entity: [PythonTask, WorkflowBase, LaunchPlan], + entity_type: str, ): """ Delegate that creates the command for a given entity. @@ -1014,10 +1038,12 @@ def _create_command( required = type(None) not in get_args(python_type) and default_val is None params.append(to_click_option(ctx, flyte_ctx, input_name, literal_var, python_type, default_val, required)) - entity_type = "Workflow" if is_workflow else "Task" h = f"{click.style(entity_type, bold=True)} ({run_level_params.computed_params.module}.{entity_name})" - if loaded_entity.__doc__: - h = h + click.style(f"{loaded_entity.__doc__}", dim=True) + if isinstance(loaded_entity, LaunchPlan): + h = h + click.style(f" (LP Name: {loaded_entity.name})", fg="yellow") + else: + if loaded_entity.__doc__: + h = h + click.style(f"{loaded_entity.__doc__}", dim=True) cmd = YamlFileReadingCommand( name=entity_name, params=params, @@ -1036,9 +1062,14 @@ def get_command(self, ctx, exe_entity): function. :return: """ - is_workflow = False + entity_type = "task" if self._entities: - is_workflow = exe_entity in self._entities.workflows + if exe_entity in self._entities.workflows: + entity_type = "workflow" + else: + lp_name = self._entities.matching_lp(exe_entity) + if lp_name: + entity_type = "launch plan" if not os.path.exists(self._filename): click.secho(f"File {self._filename} does not exist.", fg="red") exit(1) @@ -1062,7 +1093,7 @@ def get_command(self, ctx, exe_entity): entity = load_naive_entity(module, exe_entity, project_root) - return self._create_command(ctx, exe_entity, run_level_params, entity, is_workflow) + return self._create_command(ctx, exe_entity, run_level_params, entity, entity_type) class RunCommand(click.RichGroup): @@ -1106,7 +1137,7 @@ def get_command(self, ctx, filename): return RemoteEntityGroup(RemoteEntityGroup.WORKFLOW_COMMAND) elif filename == RemoteEntityGroup.TASK_COMMAND: return RemoteEntityGroup(RemoteEntityGroup.TASK_COMMAND) - return WorkflowCommand(filename, name=filename, help=f"Run a [workflow|task] from {filename}") + return WorkflowCommand(filename, name=filename, help=f"Run a [workflow|task|launch plan] from {filename}") _run_help = """ diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index b284a4c6e4..607518f0fb 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -743,7 +743,7 @@ def dispatch_execute( raise except Exception as e: if is_local_execution: - e.args = (f"Error encountered while converting inputs of '{self.name}':\n {e.args[0]}",) + e.args = (f"Error encountered while converting inputs of '{self.name}':\n {e}",) raise raise FlyteNonRecoverableSystemException(e) from e # TODO: Logger should auto inject the current context information to indicate if the task is running within @@ -755,7 +755,7 @@ def dispatch_execute( except Exception as e: if is_local_execution: # If the task is being executed locally, we want to raise the original exception - e.args = (f"Error encountered while executing '{self.name}':\n {e.args[0]}",) + e.args = (f"Error encountered while executing '{self.name}':\n {e}",) raise raise FlyteUserRuntimeException(e) from e diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 13691162d5..59dfc91a94 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -346,6 +346,7 @@ def __getattr__(self, item: str) -> str: def __init__(self, secrets_cfg: typing.Optional[SecretsConfig] = None): if secrets_cfg is None: secrets_cfg = SecretsConfig.auto() + self._base_dir = secrets_cfg.default_dir.strip() self._file_prefix = secrets_cfg.file_prefix.strip() self._env_prefix = secrets_cfg.env_prefix.strip() @@ -373,11 +374,22 @@ def get( if not get_plugin().secret_requires_group(): group, group_version = None, None - env_var = self.get_secrets_env_var(group, key, group_version) + env_prefixes = [self._env_prefix] + + # During local execution check for the key without a prefix + ctx = FlyteContextManager.current_context() + if ctx.execution_state is None or ctx.execution_state.is_local_execution(): + env_prefixes.append("") + + for env_prefix in env_prefixes: + env_var = self._get_secrets_env_var( + group=group, key=key, group_version=group_version, env_prefix=env_prefix + ) + v = os.environ.get(env_var) + if v is not None: + return v.strip() + fpath = self.get_secrets_file(group, key, group_version) - v = os.environ.get(env_var) - if v is not None: - return v.strip() if os.path.exists(fpath): with open(fpath, encode_mode) as f: return f.read().strip() @@ -392,8 +404,17 @@ def get_secrets_env_var( """ Returns a string that matches the ENV Variable to look for the secrets """ + return self._get_secrets_env_var(group=group, key=key, group_version=group_version, env_prefix=self._env_prefix) + + def _get_secrets_env_var( + self, + group: Optional[str] = None, + key: Optional[str] = None, + group_version: Optional[str] = None, + env_prefix: str = "", + ): l = [k.upper() for k in filter(None, (group, group_version, key))] - return f"{self._env_prefix}{'_'.join(l)}" + return f"{env_prefix}{'_'.join(l)}" def get_secrets_file( self, group: Optional[str] = None, key: Optional[str] = None, group_version: Optional[str] = None diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 78690fe9e2..745f452a83 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -302,6 +302,13 @@ def foo2(): Possible options for secret stores are - Vault, Confidant, Kube secrets, AWS KMS etc Refer to :py:class:`Secret` to understand how to specify the request for a secret. It may change based on the backend provider. + + .. note:: + + During local execution, the secrets will be pulled from the local environment variables + with the format `{GROUP}_{GROUP_VERSION}_{KEY}`, where all the characters are capitalized + and the prefix is not used. + :param execution_mode: This is mainly for internal use. Please ignore. It is filled in automatically. :param node_dependency_hints: A list of tasks, launchplans, or workflows that this task depends on. This is only for dynamic tasks/workflows, where flyte cannot automatically determine the dependencies prior to runtime. diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 98374ff26c..d87f4d7685 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -35,7 +35,8 @@ from flytekit import ImageSpec from flytekit.clients.friendly import SynchronousFlyteClient from flytekit.clients.helpers import iterate_node_executions, iterate_task_executions -from flytekit.configuration import Config, FastSerializationSettings, ImageConfig, SerializationSettings +from flytekit.configuration import Config, DataConfig, FastSerializationSettings, ImageConfig, SerializationSettings +from flytekit.configuration.file import ConfigFile from flytekit.constants import CopyFileDetection from flytekit.core import constants, utils from flytekit.core.array_node_map_task import ArrayNodeMapTask @@ -898,7 +899,6 @@ def register_workflow( domain=self.default_domain, ) - self._resolve_identifier(ResourceType.WORKFLOW, entity.name, version, serialization_settings) ident = run_sync( self._serialize_and_register, entity, serialization_settings, version, options, default_launch_plan ) @@ -1086,7 +1086,7 @@ def _get_image_names(self, entity: typing.Union[PythonAutoContainerTask, Workflo def register_script( self, - entity: typing.Union[WorkflowBase, PythonTask], + entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan], image_config: typing.Optional[ImageConfig] = None, version: typing.Optional[str] = None, project: typing.Optional[str] = None, @@ -1099,7 +1099,7 @@ def register_script( module_name: typing.Optional[str] = None, envs: typing.Optional[typing.Dict[str, str]] = None, fast_package_options: typing.Optional[FastPackageOptions] = None, - ) -> typing.Union[FlyteWorkflow, FlyteTask]: + ) -> typing.Union[FlyteWorkflow, FlyteTask, FlyteLaunchPlan]: """ Use this method to register a workflow via script mode. :param destination_dir: The destination directory where the workflow will be copied to. @@ -1170,9 +1170,13 @@ def register_script( if isinstance(entity, PythonTask): return self.register_task(entity, serialization_settings, version) - fwf = self.register_workflow(entity, serialization_settings, version, default_launch_plan, options) - fwf._python_interface = entity.python_interface - return fwf + + if isinstance(entity, WorkflowBase): + return self.register_workflow(entity, serialization_settings, version, default_launch_plan, options) + if isinstance(entity, LaunchPlan): + # If it's a launch plan, we need to register the workflow first + return self.register_launch_plan(entity, version, project, domain, options, serialization_settings) + raise ValueError(f"Unsupported entity type {type(entity)}") def register_launch_plan( self, @@ -1181,6 +1185,7 @@ def register_launch_plan( project: typing.Optional[str] = None, domain: typing.Optional[str] = None, options: typing.Optional[Options] = None, + serialization_settings: typing.Optional[SerializationSettings] = None, ) -> FlyteLaunchPlan: """ Register a given launchplan, possibly applying overrides from the provided options. @@ -1188,22 +1193,28 @@ def register_launch_plan( :param version: :param project: Optionally provide a project, if not already provided in flyteremote constructor or a separate one :param domain: Optionally provide a domain, if not already provided in FlyteRemote constructor or a separate one + :param serialization_settings: Optionally provide serialization settings, if not provided, will use the default :param options: :return: """ - ss = SerializationSettings( - image_config=ImageConfig(), - project=project or self.default_project, - domain=domain or self.default_domain, - version=version, + if serialization_settings is None: + _, _, _, module_file = extract_task_module(entity) + project_root = _find_project_root(module_file) + serialization_settings = SerializationSettings( + image_config=ImageConfig.auto_default_image(), + source_root=project_root, + project=project or self.default_project, + domain=domain or self.default_domain, + ) + + ident = run_sync( + self._serialize_and_register, + entity, + serialization_settings, + version, + options, + False, ) - ident = self._resolve_identifier(ResourceType.LAUNCH_PLAN, entity.name, version, ss) - m = OrderedDict() - idl_lp = get_serializable_launch_plan(m, ss, entity, recurse_downstream=False, options=options) - try: - self.client.create_launch_plan(ident, idl_lp.spec) - except FlyteEntityAlreadyExistsException: - logger.debug("Launchplan already exists, ignoring") flp = self.fetch_launch_plan(ident.project, ident.domain, ident.name, ident.version) flp._python_interface = entity.python_interface return flp @@ -2617,3 +2628,67 @@ def _pickle_and_upload_entity(self, entity: typing.Any) -> typing.Tuple[bytes, F md5_bytes, native_url = self.upload_file(dest) return md5_bytes, FastSerializationSettings(enabled=True, distribution_location=native_url, destination_dir=".") + + @classmethod + def for_endpoint( + cls, + endpoint: str, + insecure: bool = False, + data_config: typing.Optional[DataConfig] = None, + config_file: typing.Union[str, ConfigFile] = None, + default_project: typing.Optional[str] = None, + default_domain: typing.Optional[str] = None, + data_upload_location: str = "flyte://my-s3-bucket/", + interactive_mode_enabled: bool = False, + **kwargs, + ) -> "FlyteRemote": + return cls( + config=Config.for_endpoint( + endpoint=endpoint, + insecure=insecure, + data_config=data_config, + config_file=config_file, + ), + default_project=default_project, + default_domain=default_domain, + data_upload_location=data_upload_location, + interactive_mode_enabled=interactive_mode_enabled, + **kwargs, + ) + + @classmethod + def auto( + cls, + config_file: typing.Union[str, ConfigFile] = None, + default_project: typing.Optional[str] = None, + default_domain: typing.Optional[str] = None, + data_upload_location: str = "flyte://my-s3-bucket/", + interactive_mode_enabled: bool = False, + **kwargs, + ) -> "FlyteRemote": + return cls( + config=Config.auto(config_file=config_file), + default_project=default_project, + default_domain=default_domain, + data_upload_location=data_upload_location, + interactive_mode_enabled=interactive_mode_enabled, + **kwargs, + ) + + @classmethod + def for_sandbox( + cls, + default_project: typing.Optional[str] = None, + default_domain: typing.Optional[str] = None, + data_upload_location: str = "flyte://my-s3-bucket/", + interactive_mode_enabled: bool = False, + **kwargs, + ) -> "FlyteRemote": + return cls( + config=Config.for_sandbox(), + default_project=default_project, + default_domain=default_domain, + data_upload_location=data_upload_location, + interactive_mode_enabled=interactive_mode_enabled, + **kwargs, + ) diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index 125b8024d4..3bf209700c 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -59,9 +59,12 @@ def print_ls_tree(source: os.PathLike, ls: typing.List[str]): current = tree_root current_path = pathlib.Path(source) for subdir in fpp.parent.relative_to(source).parts: - current = current.add(f"{subdir}", guide_style="bold bright_blue") current_path = current_path / subdir - trees[current_path] = current + if current_path not in trees: + current = current.add(f"{subdir}", guide_style="bold bright_blue") + trees[current_path] = current + else: + current = trees[current_path] trees[fpp.parent].add(f"{fpp.name}", guide_style="bold bright_blue") rich_print(tree_root) diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index 52361f0008..12a1b1ca28 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -607,6 +607,13 @@ def to_literal( # In case it's a FlyteSchema sdt = StructuredDatasetType(format=self.DEFAULT_FORMATS.get(python_type, GENERIC_FORMAT)) + if issubclass(python_type, StructuredDataset) and not isinstance(python_val, StructuredDataset): + # Catch a common mistake + raise TypeTransformerFailedError( + f"Expected a StructuredDataset instance, but got {type(python_val)} instead." + f" Did you forget to wrap your dataframe in a StructuredDataset instance?" + ) + if expected and expected.structured_dataset_type: sdt = StructuredDatasetType( columns=expected.structured_dataset_type.columns, diff --git a/plugins/flytekit-greatexpectations/setup.py b/plugins/flytekit-greatexpectations/setup.py index 7e8af4ddef..bba579442b 100644 --- a/plugins/flytekit-greatexpectations/setup.py +++ b/plugins/flytekit-greatexpectations/setup.py @@ -8,7 +8,7 @@ "flytekit>=1.5.0", "great-expectations>=0.13.30,<1.0.0", "sqlalchemy>=1.4.23", - "pyspark==3.3.1", + "pyspark==3.3.2", "s3fs<2023.6.0", ] diff --git a/tests/flytekit/unit/core/test_context_manager.py b/tests/flytekit/unit/core/test_context_manager.py index 70a2d552d8..94c8415a1d 100644 --- a/tests/flytekit/unit/core/test_context_manager.py +++ b/tests/flytekit/unit/core/test_context_manager.py @@ -17,7 +17,7 @@ SecretsConfig, SerializationSettings, ) -from flytekit.core import mock_stats +from flytekit.core import mock_stats, context_manager from flytekit.core.context_manager import ExecutionParameters, FlyteContext, FlyteContextManager, SecretsManager from flytekit.models.core import identifier as id_models @@ -239,6 +239,48 @@ def test_secrets_manager_env(): assert sec.get(group="group", key="key") == "value" +@pytest.mark.parametrize("is_local_execution, prefix", [(True, ""), (False, "_FSEC_")]) +def test_secrets_manager_execution(monkeypatch, is_local_execution, prefix): + if not is_local_execution: + execution_state = context_manager.ExecutionState.Mode.TASK_EXECUTION + else: + execution_state = context_manager.ExecutionState.Mode.LOCAL_TASK_EXECUTION + + sec = SecretsManager() + + monkeypatch.setenv(f"{prefix}ABC_XYZ", "my-abc-secret") + + ctx = FlyteContext.current_context() + with FlyteContextManager.with_context( + ctx.with_execution_state(ctx.execution_state.with_params(mode=execution_state)) + ): + assert sec.get(group="ABC", key="XYZ") == "my-abc-secret" + + +@pytest.mark.parametrize("is_local_execution, prefix", [(True, ""), (False, "_FSEC_")]) +def test_secrets_manager_execution_no_group_required(monkeypatch, is_local_execution, prefix): + # Remove group requirements + plugin_mock = Mock() + plugin_mock.secret_requires_group.return_value = False + mock_global_plugin = {"plugin": plugin_mock} + monkeypatch.setattr(flytekit.configuration.plugin, "_GLOBAL_CONFIG", mock_global_plugin) + + if not is_local_execution: + execution_state = context_manager.ExecutionState.Mode.TASK_EXECUTION + else: + execution_state = context_manager.ExecutionState.Mode.LOCAL_TASK_EXECUTION + + sec = SecretsManager() + + monkeypatch.setenv(f"{prefix}XYZ", "my-abc-secret") + + ctx = FlyteContext.current_context() + with FlyteContextManager.with_context( + ctx.with_execution_state(ctx.execution_state.with_params(mode=execution_state)) + ): + assert sec.get(key="XYZ") == "my-abc-secret" + + def test_serialization_settings_transport(): default_img = Image(name="default", fqn="test", tag="tag") serialization_settings = SerializationSettings( diff --git a/tests/flytekit/unit/core/test_type_engine.py b/tests/flytekit/unit/core/test_type_engine.py index c63199663b..2f8e5a8a3e 100644 --- a/tests/flytekit/unit/core/test_type_engine.py +++ b/tests/flytekit/unit/core/test_type_engine.py @@ -3692,3 +3692,16 @@ def test_structured_dataset_collection(): lv = TypeEngine.to_literal(FlyteContext.current_context(), [[StructuredDataset(df)]], WineTypeListList, lt) assert lv is not None + + +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") +def test_structured_dataset_mismatch(): + import pandas as pd + + df = pd.DataFrame({"alcohol": [1.0, 2.0], "malic_acid": [2.0, 3.0]}) + transformer = TypeEngine.get_transformer(StructuredDataset) + with pytest.raises(TypeTransformerFailedError): + transformer.to_literal(FlyteContext.current_context(), df, StructuredDataset, TypeEngine.to_literal_type(StructuredDataset)) + + with pytest.raises(TypeTransformerFailedError): + TypeEngine.to_literal(FlyteContext.current_context(), df, StructuredDataset, TypeEngine.to_literal_type(StructuredDataset)) diff --git a/tests/flytekit/unit/core/test_workflows.py b/tests/flytekit/unit/core/test_workflows.py index bdbc1330e3..41c87399e2 100644 --- a/tests/flytekit/unit/core/test_workflows.py +++ b/tests/flytekit/unit/core/test_workflows.py @@ -466,8 +466,16 @@ def wf(): assert ctx.compilation_state is None +@pytest.mark.parametrize( + "error_message", [ + "Fail!", + None, + "", + ("big", "boom!") + ] +) @patch("builtins.print") -def test_failure_node_local_execution(mock_print, exec_prefix): +def test_failure_node_local_execution(mock_print, error_message, exec_prefix): @task def clean_up(name: str, err: typing.Optional[FlyteError] = None): print(f"Deleting cluster {name} due to {err}") @@ -485,7 +493,7 @@ def delete_cluster(name: str, err: typing.Optional[FlyteError] = None): @task def t1(a: int, b: str): print(f"{a} {b}") - raise ValueError("Fail!") + raise ValueError(error_message) @workflow(on_failure=clean_up) def wf(name: str = "flyteorg"): @@ -499,7 +507,7 @@ def wf(name: str = "flyteorg"): # Adjusted the error message to match the one in the failure expected_error_message = str( - FlyteError(message=f"Error encountered while executing '{exec_prefix}tests.flytekit.unit.core.test_workflows.t1':\n Fail!", failed_node_id="fn0") + FlyteError(message=f"Error encountered while executing '{exec_prefix}tests.flytekit.unit.core.test_workflows.t1':\n {error_message}", failed_node_id="fn0") ) assert mock_print.call_count > 0 diff --git a/tests/flytekit/unit/interaction/test_click_types.py b/tests/flytekit/unit/interaction/test_click_types.py index 419a88c53a..d3956fdba3 100644 --- a/tests/flytekit/unit/interaction/test_click_types.py +++ b/tests/flytekit/unit/interaction/test_click_types.py @@ -1,6 +1,7 @@ import os from dataclasses import field import json +import sys import tempfile import typing from datetime import datetime, timedelta @@ -501,6 +502,9 @@ class Datum: assert v.w[0].b == "list_item" +@pytest.mark.skipif( + sys.version_info < (3, 10), reason="handling for windows is nicer with delete_on_close, which doesn't exist in 3.9" +) def test_pickle_type(): t = PickleParamType() value = {"a": "b"} @@ -518,9 +522,10 @@ def test_pickle_type(): t.convert("typing:not_exists", None, None) # test that it can load a variable from a module - with tempfile.NamedTemporaryFile("w", dir=".", suffix=".py", delete=False) as f: + with tempfile.NamedTemporaryFile("w", dir=".", suffix=".py", delete=True, delete_on_close=False) as f: f.write("a = 1") f.flush() + f.close() # find the base name of the file basename = os.path.basename(f.name).split(".")[0] assert t.convert(f"{basename}:a", None, None) == 1