From 716fde36b5ae3f4e96cd7a1b52fa5f73150c8e93 Mon Sep 17 00:00:00 2001 From: Ketan Umare <16888709+kumare3@users.noreply.github.com> Date: Wed, 20 Sep 2023 20:47:32 -0700 Subject: [PATCH] Backfill command now supports failure-policy (#1840) Signed-off-by: Ketan Umare --- flytekit/clis/sdk_in_container/backfill.py | 35 ++++++++++++---------- flytekit/remote/backfill.py | 11 +++++-- flytekit/remote/remote.py | 10 +++++-- tests/flytekit/unit/remote/test_remote.py | 14 +++++++-- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/flytekit/clis/sdk_in_container/backfill.py b/flytekit/clis/sdk_in_container/backfill.py index 23ae7b6b7c..7723a98f44 100644 --- a/flytekit/clis/sdk_in_container/backfill.py +++ b/flytekit/clis/sdk_in_container/backfill.py @@ -3,7 +3,9 @@ import rich_click as click +from flytekit import WorkflowFailurePolicy from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context +from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec from flytekit.interaction.click_types import DateTimeType, DurationParamType _backfill_help = """ @@ -42,22 +44,8 @@ def resolve_backfill_window( @click.command("backfill", help=_backfill_help) -@click.option( - "-p", - "--project", - required=False, - type=str, - default="flytesnacks", - help="Project to register and run this workflow in", -) -@click.option( - "-d", - "--domain", - required=False, - type=str, - default="development", - help="Domain to register and run this workflow in", -) +@project_option_dec +@domain_option_dec @click.option( "-v", "--version", @@ -125,6 +113,17 @@ def resolve_backfill_window( "backfills between. This is needed with from-date / to-date. Optional if both from-date and to-date are " "provided", ) +@click.option( + "--fail-fast/--no-fail-fast", + required=False, + type=bool, + is_flag=True, + default=True, + show_default=True, + help="If set to true, the backfill will fail immediately (WorkflowFailurePolicy.FAIL_IMMEDIATELY) if any of the " + "backfill steps fail. If set to false, the backfill will continue to run even if some of the backfill steps " + "fail (WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE).", +) @click.argument( "launchplan", required=True, @@ -151,6 +150,7 @@ def backfill( parallel: bool, execution_name: str, version: str, + fail_fast: bool, ): from_date, to_date = resolve_backfill_window(from_date, to_date, backfill_window) remote = get_and_save_remote_with_click_context(ctx, project, domain) @@ -167,6 +167,9 @@ def backfill( dry_run=dry_run, execute=execute, parallel=parallel, + failure_policy=WorkflowFailurePolicy.FAIL_IMMEDIATELY + if fail_fast + else WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE, ) if dry_run: return diff --git a/flytekit/remote/backfill.py b/flytekit/remote/backfill.py index 2f31889060..b36fc7919d 100644 --- a/flytekit/remote/backfill.py +++ b/flytekit/remote/backfill.py @@ -5,7 +5,7 @@ from croniter import croniter from flytekit import LaunchPlan -from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase +from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase, WorkflowFailurePolicy from flytekit.remote.entities import FlyteLaunchPlan @@ -16,6 +16,7 @@ def create_backfill_workflow( parallel: bool = False, per_node_timeout: timedelta = None, per_node_retries: int = 0, + failure_policy: typing.Optional[WorkflowFailurePolicy] = None, ) -> typing.Tuple[WorkflowBase, datetime, datetime]: """ Generates a new imperative workflow for the launchplan that can be used to backfill the given launchplan. @@ -46,6 +47,7 @@ def create_backfill_workflow( :param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially :param per_node_timeout: timedelta Timeout to use per node :param per_node_retries: int Retries to user per node + :param failure_policy: WorkflowFailurePolicy Failure policy to use for the backfill workflow :return: WorkflowBase, datetime datetime -> New generated workflow, datetime for first instance of backfill, datetime for last instance of backfill """ if not for_lp: @@ -66,8 +68,11 @@ def create_backfill_workflow( else: raise NotImplementedError("Currently backfilling only supports cron schedules.") - logging.info(f"Generating backfill from {start_date} -> {end_date}. Parallel?[{parallel}]") - wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}") + logging.info( + f"Generating backfill from {start_date} -> {end_date}. " + f"Parallel?[{parallel}] FailurePolicy[{str(failure_policy)}]" + ) + wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}", failure_policy=failure_policy) input_name = schedule.kickoff_time_input_arg date_iter = croniter(cron_schedule.schedule, start_time=start_date, ret_type=datetime) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 9c5ad111e5..96ce18f562 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -33,7 +33,7 @@ from flytekit.core.python_auto_container import PythonAutoContainerTask from flytekit.core.reference_entity import ReferenceSpec from flytekit.core.type_engine import LiteralsResolver, TypeEngine -from flytekit.core.workflow import WorkflowBase +from flytekit.core.workflow import WorkflowBase, WorkflowFailurePolicy from flytekit.exceptions import user as user_exceptions from flytekit.exceptions.user import ( FlyteEntityAlreadyExistsException, @@ -1899,6 +1899,7 @@ def launch_backfill( dry_run: bool = False, execute: bool = True, parallel: bool = False, + failure_policy: typing.Optional[WorkflowFailurePolicy] = None, ) -> typing.Optional[FlyteWorkflowExecution, FlyteWorkflow, WorkflowBase]: """ Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, @@ -1924,12 +1925,15 @@ def launch_backfill( :param dry_run: bool do not register or execute the workflow :param execute: bool Register and execute the wwkflow. :param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially. - + :param failure_policy: WorkflowFailurePolicy (optional) to be used for the newly created workflow. This can + control failure behavior - whether to continue on failure or stop immediately on failure :return: In case of dry-run, return WorkflowBase, else if no_execute return FlyteWorkflow else in the default case return a FlyteWorkflowExecution """ lp = self.fetch_launch_plan(project=project, domain=domain, name=launchplan, version=launchplan_version) - wf, start, end = create_backfill_workflow(start_date=from_date, end_date=to_date, for_lp=lp, parallel=parallel) + wf, start, end = create_backfill_workflow( + start_date=from_date, end_date=to_date, for_lp=lp, parallel=parallel, failure_policy=failure_policy + ) if dry_run: remote_logger.warning("Dry Run enabled. Workflow will not be registered and or executed.") return wf diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index d3761fb5b3..54b6627b8a 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -13,7 +13,7 @@ from mock import ANY, MagicMock, patch import flytekit.configuration -from flytekit import CronSchedule, LaunchPlan, task, workflow +from flytekit import CronSchedule, LaunchPlan, WorkflowFailurePolicy, task, workflow from flytekit.configuration import Config, DefaultImages, Image, ImageConfig, SerializationSettings from flytekit.core.base_task import PythonTask from flytekit.core.context_manager import FlyteContextManager @@ -355,8 +355,18 @@ def test_launch_backfill(remote): ), ) - wf = remote.launch_backfill("p", "d", start_date, end_date, "daily2", "v1", dry_run=True) + wf = remote.launch_backfill( + "p", + "d", + start_date, + end_date, + "daily2", + "v1", + dry_run=True, + failure_policy=WorkflowFailurePolicy.FAIL_IMMEDIATELY, + ) assert wf + assert wf.workflow_metadata.on_failure == WorkflowFailurePolicy.FAIL_IMMEDIATELY @mock.patch("flytekit.remote.remote.FlyteRemote.client")