diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 3762dec92d..4181d0c928 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -488,6 +488,9 @@ class Mode(Enum): # or propeller. LOCAL_TASK_EXECUTION = 3 + # This is the mode that is used to indicate a dynamic task + DYNAMIC_TASK_EXECUTION = 4 + mode: Optional[ExecutionState.Mode] working_dir: Union[os.PathLike, str] engine_dir: Optional[Union[os.PathLike, str]] diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index b378e23ecf..e1e80a4227 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -202,7 +202,12 @@ def compile_into_workflow( else: cs = ctx.compilation_state.with_params(prefix="d") - with FlyteContextManager.with_context(ctx.with_compilation_state(cs)): + updated_ctx = ctx.with_compilation_state(cs) + if self.execution_mode == self.ExecutionBehavior.DYNAMIC: + es = ctx.new_execution_state().with_params(mode=ExecutionState.Mode.DYNAMIC_TASK_EXECUTION) + updated_ctx = updated_ctx.with_execution_state(es) + + with FlyteContextManager.with_context(updated_ctx): # TODO: Resolve circular import from flytekit.tools.translator import get_serializable