Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backfill command now supports failure-policy #1840

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions flytekit/clis/sdk_in_container/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import rich_click as click

from flytekit import WorkflowFailurePolicy

Check warning on line 6 in flytekit/clis/sdk_in_container/backfill.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clis/sdk_in_container/backfill.py#L6

Added line #L6 was not covered by tests
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec

Check warning on line 8 in flytekit/clis/sdk_in_container/backfill.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clis/sdk_in_container/backfill.py#L8

Added line #L8 was not covered by tests
from flytekit.interaction.click_types import DateTimeType, DurationParamType

_backfill_help = """
Expand Down Expand Up @@ -42,22 +44,8 @@


@click.command("backfill", help=_backfill_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Project to register and run this workflow in",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="Domain to register and run this workflow in",
)
@project_option_dec
@domain_option_dec
@click.option(
"-v",
"--version",
Expand Down Expand Up @@ -125,6 +113,17 @@
"backfills between. This is needed with from-date / to-date. Optional if both from-date and to-date are "
"provided",
)
@click.option(
"--fail-fast/--no-fail-fast",
required=False,
type=bool,
is_flag=True,
default=True,
show_default=True,
help="If set to true, the backfill will fail immediately (WorkflowFailurePolicy.FAIL_IMMEDIATELY) if any of the "
"backfill steps fail. If set to false, the backfill will continue to run even if some of the backfill steps "
"fail (WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE).",
)
@click.argument(
"launchplan",
required=True,
Expand All @@ -151,6 +150,7 @@
parallel: bool,
execution_name: str,
version: str,
fail_fast: bool,
):
from_date, to_date = resolve_backfill_window(from_date, to_date, backfill_window)
remote = get_and_save_remote_with_click_context(ctx, project, domain)
Expand All @@ -167,6 +167,9 @@
dry_run=dry_run,
execute=execute,
parallel=parallel,
failure_policy=WorkflowFailurePolicy.FAIL_IMMEDIATELY
if fail_fast
else WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE,
)
if dry_run:
return
Expand Down
11 changes: 8 additions & 3 deletions flytekit/remote/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from croniter import croniter

from flytekit import LaunchPlan
from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase
from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase, WorkflowFailurePolicy
from flytekit.remote.entities import FlyteLaunchPlan


Expand All @@ -16,6 +16,7 @@
parallel: bool = False,
per_node_timeout: timedelta = None,
per_node_retries: int = 0,
failure_policy: typing.Optional[WorkflowFailurePolicy] = None,
) -> typing.Tuple[WorkflowBase, datetime, datetime]:
"""
Generates a new imperative workflow for the launchplan that can be used to backfill the given launchplan.
Expand Down Expand Up @@ -46,6 +47,7 @@
:param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially
:param per_node_timeout: timedelta Timeout to use per node
:param per_node_retries: int Retries to user per node
:param failure_policy: WorkflowFailurePolicy Failure policy to use for the backfill workflow
:return: WorkflowBase, datetime datetime -> New generated workflow, datetime for first instance of backfill, datetime for last instance of backfill
"""
if not for_lp:
Expand All @@ -66,8 +68,11 @@
else:
raise NotImplementedError("Currently backfilling only supports cron schedules.")

logging.info(f"Generating backfill from {start_date} -> {end_date}. Parallel?[{parallel}]")
wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}")
logging.info(

Check warning on line 71 in flytekit/remote/backfill.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/backfill.py#L71

Added line #L71 was not covered by tests
f"Generating backfill from {start_date} -> {end_date}. "
f"Parallel?[{parallel}] FailurePolicy[{str(failure_policy)}]"
)
wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}", failure_policy=failure_policy)

Check warning on line 75 in flytekit/remote/backfill.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/backfill.py#L75

Added line #L75 was not covered by tests

input_name = schedule.kickoff_time_input_arg
date_iter = croniter(cron_schedule.schedule, start_time=start_date, ret_type=datetime)
Expand Down
10 changes: 7 additions & 3 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from flytekit.core.python_auto_container import PythonAutoContainerTask
from flytekit.core.reference_entity import ReferenceSpec
from flytekit.core.type_engine import LiteralsResolver, TypeEngine
from flytekit.core.workflow import WorkflowBase
from flytekit.core.workflow import WorkflowBase, WorkflowFailurePolicy
from flytekit.exceptions import user as user_exceptions
from flytekit.exceptions.user import (
FlyteEntityAlreadyExistsException,
Expand Down Expand Up @@ -1899,6 +1899,7 @@
dry_run: bool = False,
execute: bool = True,
parallel: bool = False,
failure_policy: typing.Optional[WorkflowFailurePolicy] = None,
) -> typing.Optional[FlyteWorkflowExecution, FlyteWorkflow, WorkflowBase]:
"""
Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified,
Expand All @@ -1924,12 +1925,15 @@
:param dry_run: bool do not register or execute the workflow
:param execute: bool Register and execute the wwkflow.
:param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially.

:param failure_policy: WorkflowFailurePolicy (optional) to be used for the newly created workflow. This can
control failure behavior - whether to continue on failure or stop immediately on failure
:return: In case of dry-run, return WorkflowBase, else if no_execute return FlyteWorkflow else in the default
case return a FlyteWorkflowExecution
"""
lp = self.fetch_launch_plan(project=project, domain=domain, name=launchplan, version=launchplan_version)
wf, start, end = create_backfill_workflow(start_date=from_date, end_date=to_date, for_lp=lp, parallel=parallel)
wf, start, end = create_backfill_workflow(

Check warning on line 1934 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L1934

Added line #L1934 was not covered by tests
start_date=from_date, end_date=to_date, for_lp=lp, parallel=parallel, failure_policy=failure_policy
)
if dry_run:
remote_logger.warning("Dry Run enabled. Workflow will not be registered and or executed.")
return wf
Expand Down
14 changes: 12 additions & 2 deletions tests/flytekit/unit/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from mock import ANY, MagicMock, patch

import flytekit.configuration
from flytekit import CronSchedule, LaunchPlan, task, workflow
from flytekit import CronSchedule, LaunchPlan, WorkflowFailurePolicy, task, workflow

Check warning on line 16 in tests/flytekit/unit/remote/test_remote.py

View check run for this annotation

Codecov / codecov/patch

tests/flytekit/unit/remote/test_remote.py#L16

Added line #L16 was not covered by tests
from flytekit.configuration import Config, DefaultImages, Image, ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.context_manager import FlyteContextManager
Expand Down Expand Up @@ -355,8 +355,18 @@
),
)

wf = remote.launch_backfill("p", "d", start_date, end_date, "daily2", "v1", dry_run=True)
wf = remote.launch_backfill(

Check warning on line 358 in tests/flytekit/unit/remote/test_remote.py

View check run for this annotation

Codecov / codecov/patch

tests/flytekit/unit/remote/test_remote.py#L358

Added line #L358 was not covered by tests
"p",
"d",
start_date,
end_date,
"daily2",
"v1",
dry_run=True,
failure_policy=WorkflowFailurePolicy.FAIL_IMMEDIATELY,
)
assert wf
assert wf.workflow_metadata.on_failure == WorkflowFailurePolicy.FAIL_IMMEDIATELY

Check warning on line 369 in tests/flytekit/unit/remote/test_remote.py

View check run for this annotation

Codecov / codecov/patch

tests/flytekit/unit/remote/test_remote.py#L369

Added line #L369 was not covered by tests


@mock.patch("flytekit.remote.remote.FlyteRemote.client")
Expand Down
Loading