From 9e9f8f26cf9e16b25558fbca84179f0da78d267a Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 12 Sep 2023 22:23:37 -0700 Subject: [PATCH 1/2] Pyflyte register auto activates schedule Signed-off-by: Ketan Umare --- flytekit/clis/sdk_in_container/build.py | 2 +- flytekit/clis/sdk_in_container/constants.py | 52 ---------------- flytekit/clis/sdk_in_container/launchplan.py | 57 ++++++++---------- flytekit/clis/sdk_in_container/register.py | 21 ++++--- flytekit/clis/sdk_in_container/run.py | 34 ++++------- flytekit/clis/sdk_in_container/utils.py | 63 ++++++++++++++++++++ flytekit/remote/remote.py | 7 +++ flytekit/tools/repo.py | 16 ++++- 8 files changed, 133 insertions(+), 119 deletions(-) diff --git a/flytekit/clis/sdk_in_container/build.py b/flytekit/clis/sdk_in_container/build.py index 89bcd417df..c4eb819eb6 100644 --- a/flytekit/clis/sdk_in_container/build.py +++ b/flytekit/clis/sdk_in_container/build.py @@ -4,8 +4,8 @@ import rich_click as click from typing_extensions import OrderedDict -from flytekit.clis.sdk_in_container.constants import make_field from flytekit.clis.sdk_in_container.run import RunCommand, RunLevelParams, WorkflowCommand +from flytekit.clis.sdk_in_container.utils import make_field from flytekit.configuration import ImageConfig, SerializationSettings from flytekit.core.base_task import PythonTask from flytekit.core.workflow import PythonFunctionWorkflow diff --git a/flytekit/clis/sdk_in_container/constants.py b/flytekit/clis/sdk_in_container/constants.py index 8d22b32729..dd9c6f4e87 100644 --- a/flytekit/clis/sdk_in_container/constants.py +++ b/flytekit/clis/sdk_in_container/constants.py @@ -1,10 +1,3 @@ -import typing -from dataclasses import Field, dataclass, field -from types import MappingProxyType - -import click -import rich_click as _click - CTX_PROJECT = "project" CTX_DOMAIN = "domain" CTX_VERSION = "version" @@ -13,48 +6,3 @@ CTX_NOTIFICATIONS = "notifications" CTX_CONFIG_FILE = "config_file" CTX_VERBOSE = "verbose" - - -def make_field(o: click.Option) -> Field: - if o.multiple: - o.help = click.style("Multiple values allowed.", bold=True) + f"{o.help}" - return field(default_factory=lambda: o.default, metadata={"click.option": o}) - return field(default=o.default, metadata={"click.option": o}) - - -def get_option_from_metadata(metadata: MappingProxyType) -> click.Option: - return metadata["click.option"] - - -@dataclass -class PyFlyteParams: - config_file: typing.Optional[str] = None - verbose: bool = False - pkgs: typing.List[str] = field(default_factory=list) - - @classmethod - def from_dict(cls, d: typing.Dict[str, typing.Any]) -> "PyFlyteParams": - return cls(**d) - - -project_option = _click.option( - "-p", - "--project", - required=True, - type=str, - help="Flyte project to use. You can have more than one project per repo", -) -domain_option = _click.option( - "-d", - "--domain", - required=True, - type=str, - help="This is usually development, staging, or production", -) -version_option = _click.option( - "-v", - "--version", - required=False, - type=str, - help="This is the version to apply globally for this context", -) diff --git a/flytekit/clis/sdk_in_container/launchplan.py b/flytekit/clis/sdk_in_container/launchplan.py index 2d33e2e3d7..b5022ef5d7 100644 --- a/flytekit/clis/sdk_in_container/launchplan.py +++ b/flytekit/clis/sdk_in_container/launchplan.py @@ -1,6 +1,8 @@ import rich_click as click +from rich.progress import Progress from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context +from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec from flytekit.models.launch_plan import LaunchPlanState _launchplan_help = """ @@ -13,22 +15,8 @@ @click.command("launchplan", help=_launchplan_help) -@click.option( - "-p", - "--project", - required=False, - type=str, - default="flytesnacks", - help="Fecth launchplan from this project", -) -@click.option( - "-d", - "--domain", - required=False, - type=str, - default="development", - help="Fetch launchplan from this domain", -) +@project_option_dec +@domain_option_dec @click.option( "--activate/--deactivate", required=True, @@ -57,18 +45,25 @@ def launchplan( launchplan_version: str, ): remote = get_and_save_remote_with_click_context(ctx, project, domain) - try: - launchplan = remote.fetch_launch_plan( - project=project, - domain=domain, - name=launchplan, - version=launchplan_version, - ) - state = LaunchPlanState.ACTIVE if activate else LaunchPlanState.INACTIVE - remote.client.update_launch_plan(id=launchplan.id, state=state) - click.secho( - f"\n Launchplan was set to {LaunchPlanState.enum_to_string(state)}: {launchplan.name}:{launchplan.id.version}", - fg="green", - ) - except StopIteration as e: - click.secho(f"{e.value}", fg="red") + with Progress() as progress: + t1 = progress.add_task(f"[cyan] {'Activating' if activate else 'Deactivating'}...", total=1) + try: + progress.start_task(t1) + launchplan = remote.fetch_launch_plan( + project=project, + domain=domain, + name=launchplan, + version=launchplan_version, + ) + progress.advance(t1) + + state = LaunchPlanState.ACTIVE if activate else LaunchPlanState.INACTIVE + remote.client.update_launch_plan(id=launchplan.id, state=state) + progress.advance(t1) + progress.update(t1, completed=True, visible=False) + click.secho( + f"\n Launchplan was set to {LaunchPlanState.enum_to_string(state)}: {launchplan.name}:{launchplan.id.version}", + fg="green", + ) + except StopIteration as e: + click.secho(f"{e.value}", fg="red") diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index afc7aeb99e..8efd65f98a 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -6,6 +6,7 @@ from flytekit.clis.helpers import display_help_with_error from flytekit.clis.sdk_in_container import constants from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context, patch_image_config +from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec from flytekit.configuration import ImageConfig from flytekit.configuration.default_images import DefaultImages from flytekit.loggers import cli_logger @@ -27,14 +28,8 @@ @click.command("register", help=_register_help) -@click.option( - "-p", - "--project", - required=False, - type=str, - default="flytesnacks", - help="Project to register and run this workflow in", -) +@project_option_dec +@domain_option_dec @click.option( "-d", "--domain", @@ -113,6 +108,14 @@ is_flag=True, help="Execute registration in dry-run mode. Skips actual registration to remote", ) +@click.option( + "--activate-launchplans", + "--activate-launchplan", + default=False, + is_flag=True, + help="Flip current active version to the newly registered version of an active LaunchPlan. This combines to " + "operations into one", +) @click.argument("package-or-module", type=click.Path(exists=True, readable=True, resolve_path=True), nargs=-1) @click.pass_context def register( @@ -129,6 +132,7 @@ def register( non_fast: bool, package_or_module: typing.Tuple[str], dry_run: bool, + activate_launchplans: bool, ): """ see help @@ -179,6 +183,7 @@ def register( package_or_module=package_or_module, remote=remote, dry_run=dry_run, + activate_launchplans=activate_launchplans, ) except Exception as e: raise e diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index bb332673ab..0ee09dc981 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -14,9 +14,15 @@ from rich.progress import Progress from flytekit import Annotations, FlyteContext, Labels, Literal -from flytekit.clis.sdk_in_container.constants import PyFlyteParams, get_option_from_metadata, make_field from flytekit.clis.sdk_in_container.helpers import get_remote, patch_image_config -from flytekit.clis.sdk_in_container.utils import pretty_print_exception +from flytekit.clis.sdk_in_container.utils import ( + PyFlyteParams, + domain_option, + get_option_from_metadata, + make_field, + pretty_print_exception, + project_option, +) from flytekit.configuration import DefaultImages, ImageConfig from flytekit.core import context_manager from flytekit.core.base_task import PythonTask @@ -53,28 +59,8 @@ class RunLevelParams(PyFlyteParams): This class is used to store the parameters that are used to run a workflow / task / launchplan. """ - project: str = make_field( - click.Option( - param_decls=["-p", "--project"], - required=False, - type=str, - default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"), - show_default=True, - help="Project to register and run this workflow in. Can also be set through envvar " - "``FLYTE_DEFAULT_PROJECT``", - ) - ) - domain: str = make_field( - click.Option( - param_decls=["-d", "--domain"], - required=False, - type=str, - default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"), - show_default=True, - help="Domain to register and run this workflow in, can also be set through envvar " - "``FLYTE_DEFAULT_DOMAIN``", - ) - ) + project: str = make_field(project_option) + domain: str = make_field(domain_option) destination_dir: str = make_field( click.Option( param_decls=["--destination-dir", "destination_dir"], diff --git a/flytekit/clis/sdk_in_container/utils.py b/flytekit/clis/sdk_in_container/utils.py index 8b08c7761a..3f975913e0 100644 --- a/flytekit/clis/sdk_in_container/utils.py +++ b/flytekit/clis/sdk_in_container/utils.py @@ -1,4 +1,7 @@ +import os import typing +from dataclasses import Field, dataclass, field +from types import MappingProxyType import grpc import rich_click as click @@ -9,6 +12,44 @@ from flytekit.exceptions.user import FlyteInvalidInputException from flytekit.loggers import cli_logger +project_option = click.Option( + param_decls=["-p", "--project"], + required=False, + type=str, + default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"), + show_default=True, + help="Project to register and run this workflow in. Can also be set through envvar " "``FLYTE_DEFAULT_PROJECT``", +) + +domain_option = click.Option( + param_decls=["-d", "--domain"], + required=False, + type=str, + default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"), + show_default=True, + help="Domain to register and run this workflow in, can also be set through envvar " "``FLYTE_DEFAULT_DOMAIN``", +) + +project_option_dec = click.option( + "-p", + "--project", + required=False, + type=str, + default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"), + show_default=True, + help="Project for workflow/launchplan. Can also be set through envvar " "``FLYTE_DEFAULT_PROJECT``", +) + +domain_option_dec = click.option( + "-d", + "--domain", + required=False, + type=str, + default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"), + show_default=True, + help="Domain for workflow/launchplan, can also be set through envvar " "``FLYTE_DEFAULT_DOMAIN``", +) + def validate_package(ctx, param, values): """ @@ -87,3 +128,25 @@ def invoke(self, ctx: click.Context) -> typing.Any: raise e pretty_print_exception(e) raise SystemExit(e) from e + + +def make_field(o: click.Option) -> Field: + if o.multiple: + o.help = click.style("Multiple values allowed.", bold=True) + f"{o.help}" + return field(default_factory=lambda: o.default, metadata={"click.option": o}) + return field(default=o.default, metadata={"click.option": o}) + + +def get_option_from_metadata(metadata: MappingProxyType) -> click.Option: + return metadata["click.option"] + + +@dataclass +class PyFlyteParams: + config_file: typing.Optional[str] = None + verbose: bool = False + pkgs: typing.List[str] = field(default_factory=list) + + @classmethod + def from_dict(cls, d: typing.Dict[str, typing.Any]) -> "PyFlyteParams": + return cls(**d) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 74df68df45..9c5ad111e5 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -60,6 +60,7 @@ NotificationList, WorkflowExecutionGetDataResponse, ) +from flytekit.models.launch_plan import LaunchPlanState from flytekit.models.literals import Literal, LiteralMap from flytekit.remote.backfill import create_backfill_workflow from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow @@ -1957,3 +1958,9 @@ def get_extra_headers_for_protocol(native_url): if native_url.startswith("abfs://"): return {"x-ms-blob-type": "BlockBlob"} return {} + + def activate_launchplan(self, ident: Identifier): + """ + Given a launchplan, activate it, all previous versions are deactivated. + """ + self.client.update_launch_plan(id=ident, state=LaunchPlanState.ACTIVE) diff --git a/flytekit/tools/repo.py b/flytekit/tools/repo.py index e55350d3ae..5473b4cef8 100644 --- a/flytekit/tools/repo.py +++ b/flytekit/tools/repo.py @@ -183,7 +183,7 @@ def load_packages_and_modules( return registrable_entities -def secho(i: Identifier, state: str = "success", reason: str = None): +def secho(i: Identifier, state: str = "success", reason: str = None, op: str = "Registration"): state_ind = "[ ]" fg = "white" nl = False @@ -198,7 +198,7 @@ def secho(i: Identifier, state: str = "success", reason: str = None): nl = True reason = "skipped!" click.secho( - click.style(f"{state_ind}", fg=fg) + f" Registration {i.name} type {i.resource_type_name()} {reason}", + click.style(f"{state_ind}", fg=fg) + f" {op} {i.name} type {i.resource_type_name()} {reason}", dim=True, nl=nl, ) @@ -218,6 +218,7 @@ def register( package_or_module: typing.Tuple[str], remote: FlyteRemote, dry_run: bool = False, + activate_launchplans: bool = False, ): detected_root = find_common_root(package_or_module) click.secho(f"Detected Root {detected_root}, using this to create deployable package...", fg="yellow") @@ -262,7 +263,12 @@ def register( return for cp_entity in registrable_entities: - og_id = cp_entity.id if isinstance(cp_entity, launch_plan.LaunchPlan) else cp_entity.template.id + is_lp = False + if isinstance(cp_entity, launch_plan.LaunchPlan): + og_id = cp_entity.id + is_lp = True + else: + og_id = cp_entity.template.id secho(og_id, "") try: if not dry_run: @@ -270,6 +276,10 @@ def register( cp_entity, serialization_settings, version=version, create_default_launchplan=False ) secho(i) + if is_lp and activate_launchplans: + secho(og_id, "", op="Activation") + remote.activate_launchplan(i) + secho(i, reason="activated", op="Activation") else: secho(og_id, reason="Dry run Mode!") except RegistrationSkipped: From 2ab16b9162d19b6ce81db90fb01067347e164985 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 13 Sep 2023 15:39:37 -0700 Subject: [PATCH 2/2] comment addressed Signed-off-by: Ketan Umare --- flytekit/clis/sdk_in_container/register.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 8efd65f98a..2313b00fc6 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -113,8 +113,7 @@ "--activate-launchplan", default=False, is_flag=True, - help="Flip current active version to the newly registered version of an active LaunchPlan. This combines to " - "operations into one", + help="Activate newly registered Launchplans. This operation deactivates previous versions of Launchplans.", ) @click.argument("package-or-module", type=click.Path(exists=True, readable=True, resolve_path=True), nargs=-1) @click.pass_context