Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update branch #2844

Merged
4 changes: 4 additions & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
from importlib.metadata import entry_points

from flytekit._version import __version__
from flytekit.configuration import Config
from flytekit.core.array_node_map_task import map_task
from flytekit.core.artifact import Artifact
from flytekit.core.base_sql_task import SQLTask
Expand Down Expand Up @@ -250,8 +251,11 @@
from flytekit.models.documentation import Description, Documentation, SourceCode
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.types import LiteralType
from flytekit.remote import FlyteRemote
from flytekit.sensor.sensor_engine import SensorEngine
from flytekit.types import directory, file, iterator
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
StructuredDatasetFormat,
Expand Down
59 changes: 45 additions & 14 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from rich.progress import Progress, TextColumn, TimeElapsedColumn
from typing_extensions import get_origin

from flytekit import Annotations, FlyteContext, FlyteContextManager, Labels, Literal, WorkflowExecutionPhase
from flytekit import Annotations, FlyteContext, FlyteContextManager, Labels, LaunchPlan, Literal, WorkflowExecutionPhase
from flytekit.clis.sdk_in_container.helpers import (
parse_copy,
patch_image_config,
Expand Down Expand Up @@ -369,13 +369,25 @@ class Entities(typing.NamedTuple):

workflows: typing.List[str]
tasks: typing.List[str]
launch_plans: typing.List[typing.Tuple[str, str]] # LP is stored as a tuple of name, the variable name in the file

def all(self) -> typing.List[str]:
e = []
e.extend(self.workflows)
e.extend(self.tasks)
for i in self.launch_plans:
e.append(i[0])
return e

def matching_lp(self, lp_name: str) -> typing.Optional[str]:
"""
Returns the variable name of the launch plan in the file
"""
for i in self.launch_plans:
if i[0] == lp_name:
return i[1]
return None


def get_entities_in_file(filename: pathlib.Path, should_delete: bool) -> Entities:
"""
Expand All @@ -393,17 +405,25 @@ def get_entities_in_file(filename: pathlib.Path, should_delete: bool) -> Entitie

workflows = []
tasks = []
launch_plans = []
module = importlib.import_module(module_name)
for name in dir(module):
o = module.__dict__[name]
if isinstance(o, WorkflowBase):
workflows.append(name)
elif isinstance(o, PythonTask):
tasks.append(name)
elif isinstance(o, LaunchPlan):
varname = name
if o.name:
# name refers to the variable name, while o.name refers to the launch plan name if the user has
# specified one
name = o.name
launch_plans.append((varname, name))

if should_delete and os.path.exists(filename):
os.remove(filename)
return Entities(workflows, tasks)
return Entities(workflows, tasks, launch_plans)


def to_click_option(
Expand Down Expand Up @@ -592,7 +612,7 @@ def is_optional(_type):
return typing.get_origin(_type) is typing.Union and type(None) in typing.get_args(_type)


def run_command(ctx: click.Context, entity: typing.Union[PythonFunctionWorkflow, PythonTask]):
def run_command(ctx: click.Context, entity: typing.Union[PythonFunctionWorkflow, PythonTask, LaunchPlan]):
"""
Returns a function that is used to implement WorkflowCommand and execute a flyte workflow.
"""
Expand All @@ -604,7 +624,11 @@ def _run(*args, **kwargs):
# By the time we get to this function, all the loading has already happened

run_level_params: RunLevelParams = ctx.obj
entity_type = "workflow" if isinstance(entity, PythonFunctionWorkflow) else "task"
entity_type = "workflow"
if isinstance(entity, LaunchPlan):
entity_type = "launch plan"
elif isinstance(entity, PythonTask):
entity_type = "task"
logger.debug(f"Running {entity_type} {entity.name} with input {kwargs}")

click.secho(
Expand Down Expand Up @@ -981,7 +1005,7 @@ def __init__(self, filename: str, *args, **kwargs):
else:
self._filename = pathlib.Path(filename).resolve()
self._should_delete = False
self._entities = None
self._entities: typing.Optional[Entities] = None

def list_commands(self, ctx):
if self._entities:
Expand All @@ -995,8 +1019,8 @@ def _create_command(
ctx: click.Context,
entity_name: str,
run_level_params: RunLevelParams,
loaded_entity: [PythonTask, WorkflowBase],
is_workflow: bool,
loaded_entity: [PythonTask, WorkflowBase, LaunchPlan],
entity_type: str,
):
"""
Delegate that creates the command for a given entity.
Expand All @@ -1014,10 +1038,12 @@ def _create_command(
required = type(None) not in get_args(python_type) and default_val is None
params.append(to_click_option(ctx, flyte_ctx, input_name, literal_var, python_type, default_val, required))

entity_type = "Workflow" if is_workflow else "Task"
h = f"{click.style(entity_type, bold=True)} ({run_level_params.computed_params.module}.{entity_name})"
if loaded_entity.__doc__:
h = h + click.style(f"{loaded_entity.__doc__}", dim=True)
if isinstance(loaded_entity, LaunchPlan):
h = h + click.style(f" (LP Name: {loaded_entity.name})", fg="yellow")
else:
if loaded_entity.__doc__:
h = h + click.style(f"{loaded_entity.__doc__}", dim=True)
cmd = YamlFileReadingCommand(
name=entity_name,
params=params,
Expand All @@ -1036,9 +1062,14 @@ def get_command(self, ctx, exe_entity):
function.
:return:
"""
is_workflow = False
entity_type = "task"
if self._entities:
is_workflow = exe_entity in self._entities.workflows
if exe_entity in self._entities.workflows:
entity_type = "workflow"
else:
lp_name = self._entities.matching_lp(exe_entity)
if lp_name:
entity_type = "launch plan"
if not os.path.exists(self._filename):
click.secho(f"File {self._filename} does not exist.", fg="red")
exit(1)
Expand All @@ -1062,7 +1093,7 @@ def get_command(self, ctx, exe_entity):

entity = load_naive_entity(module, exe_entity, project_root)

return self._create_command(ctx, exe_entity, run_level_params, entity, is_workflow)
return self._create_command(ctx, exe_entity, run_level_params, entity, entity_type)


class RunCommand(click.RichGroup):
Expand Down Expand Up @@ -1106,7 +1137,7 @@ def get_command(self, ctx, filename):
return RemoteEntityGroup(RemoteEntityGroup.WORKFLOW_COMMAND)
elif filename == RemoteEntityGroup.TASK_COMMAND:
return RemoteEntityGroup(RemoteEntityGroup.TASK_COMMAND)
return WorkflowCommand(filename, name=filename, help=f"Run a [workflow|task] from {filename}")
return WorkflowCommand(filename, name=filename, help=f"Run a [workflow|task|launch plan] from {filename}")


_run_help = """
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ def dispatch_execute(
raise
except Exception as e:
if is_local_execution:
e.args = (f"Error encountered while converting inputs of '{self.name}':\n {e.args[0]}",)
e.args = (f"Error encountered while converting inputs of '{self.name}':\n {e}",)
raise
raise FlyteNonRecoverableSystemException(e) from e
# TODO: Logger should auto inject the current context information to indicate if the task is running within
Expand All @@ -755,7 +755,7 @@ def dispatch_execute(
except Exception as e:
if is_local_execution:
# If the task is being executed locally, we want to raise the original exception
e.args = (f"Error encountered while executing '{self.name}':\n {e.args[0]}",)
e.args = (f"Error encountered while executing '{self.name}':\n {e}",)
raise
raise FlyteUserRuntimeException(e) from e

Expand Down
31 changes: 26 additions & 5 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def __getattr__(self, item: str) -> str:
def __init__(self, secrets_cfg: typing.Optional[SecretsConfig] = None):
if secrets_cfg is None:
secrets_cfg = SecretsConfig.auto()

self._base_dir = secrets_cfg.default_dir.strip()
self._file_prefix = secrets_cfg.file_prefix.strip()
self._env_prefix = secrets_cfg.env_prefix.strip()
Expand Down Expand Up @@ -373,11 +374,22 @@ def get(
if not get_plugin().secret_requires_group():
group, group_version = None, None

env_var = self.get_secrets_env_var(group, key, group_version)
env_prefixes = [self._env_prefix]

# During local execution check for the key without a prefix
ctx = FlyteContextManager.current_context()
if ctx.execution_state is None or ctx.execution_state.is_local_execution():
env_prefixes.append("")

for env_prefix in env_prefixes:
env_var = self._get_secrets_env_var(
group=group, key=key, group_version=group_version, env_prefix=env_prefix
)
v = os.environ.get(env_var)
if v is not None:
return v.strip()

fpath = self.get_secrets_file(group, key, group_version)
v = os.environ.get(env_var)
if v is not None:
return v.strip()
if os.path.exists(fpath):
with open(fpath, encode_mode) as f:
return f.read().strip()
Expand All @@ -392,8 +404,17 @@ def get_secrets_env_var(
"""
Returns a string that matches the ENV Variable to look for the secrets
"""
return self._get_secrets_env_var(group=group, key=key, group_version=group_version, env_prefix=self._env_prefix)

def _get_secrets_env_var(
self,
group: Optional[str] = None,
key: Optional[str] = None,
group_version: Optional[str] = None,
env_prefix: str = "",
):
l = [k.upper() for k in filter(None, (group, group_version, key))]
return f"{self._env_prefix}{'_'.join(l)}"
return f"{env_prefix}{'_'.join(l)}"

def get_secrets_file(
self, group: Optional[str] = None, key: Optional[str] = None, group_version: Optional[str] = None
Expand Down
7 changes: 7 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ def foo2():
Possible options for secret stores are - Vault, Confidant, Kube secrets, AWS KMS etc
Refer to :py:class:`Secret` to understand how to specify the request for a secret. It
may change based on the backend provider.

.. note::

During local execution, the secrets will be pulled from the local environment variables
with the format `{GROUP}_{GROUP_VERSION}_{KEY}`, where all the characters are capitalized
and the prefix is not used.

:param execution_mode: This is mainly for internal use. Please ignore. It is filled in automatically.
:param node_dependency_hints: A list of tasks, launchplans, or workflows that this task depends on. This is only
for dynamic tasks/workflows, where flyte cannot automatically determine the dependencies prior to runtime.
Expand Down
Loading
Loading