Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up and move user guide Python files #1584

Merged
merged 19 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
max-line-length = 180
extend-ignore = E203, E266, E501, W503, E741
extend-ignore = E203, E266, E402, E501, W503, E741
exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,venv/*,src/*,.rst,build
max-complexity=16
3 changes: 3 additions & 0 deletions example_code/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Flytesnacks example code

A repository of runnable example code for Flyte.
5 changes: 5 additions & 0 deletions example_code/advanced_composition/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Advanced Composition

These examples introduce the advanced features of the Flytekit Python SDK.
They cover more complex aspects of Flyte, including conditions, subworkflows,
dynamic workflows, map tasks, gate nodes and more.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from flytekit import task, workflow


@task
def t2():
print("Running t2")
return


@task
def t1():
print("Running t1")
return


@task
def t0():
print("Running t0")
return


# Chaining tasks
@workflow
def chain_tasks_wf():
t2_promise = t2()
t1_promise = t1()
t0_promise = t0()

t0_promise >> t1_promise
t1_promise >> t2_promise


# Chaining subworkflows
@workflow
def sub_workflow_1():
t1()


@workflow
def sub_workflow_0():
t0()


@workflow
def chain_workflows_wf():
sub_wf1 = sub_workflow_1()
sub_wf0 = sub_workflow_0()

sub_wf0 >> sub_wf1
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from flytekit import current_context, task, workflow
from flytekit.exceptions.user import FlyteRecoverableException

RETRIES = 3


# Define a task to iterate precisely `n_iterations`, checkpoint its state, and recover from simulated failures.
@task(retries=RETRIES)
def use_checkpoint(n_iterations: int) -> int:
cp = current_context().checkpoint
prev = cp.read()

start = 0
if prev:
start = int(prev.decode())

# Create a failure interval to simulate failures across 'n' iterations and then succeed after configured retries
failure_interval = n_iterations // RETRIES
index = 0
for index in range(start, n_iterations):
# Simulate a deterministic failure for demonstration. Showcasing how it eventually completes within the given retries
if index > start and index % failure_interval == 0:
raise FlyteRecoverableException(f"Failed at iteration {index}, failure_interval {failure_interval}.")
# Save progress state. It is also entirely possible to save state every few intervals
cp.write(f"{index + 1}".encode())
return index


# Create a workflow that invokes the task.
# The task will automatically undergo retries in the event of a FlyteRecoverableException.
@workflow
def checkpointing_example(n_iterations: int) -> int:
return use_checkpoint(n_iterations=n_iterations)


# The local checkpoint is not utilized here because retries are not supported.
if __name__ == "__main__":
try:
checkpointing_example(n_iterations=10)
except RuntimeError as e: # noqa : F841
# Since no retries are performed, an exception is expected when run locally
pass
Original file line number Diff line number Diff line change
@@ -1,32 +1,13 @@
# %% [markdown]
# (conditional)=
#
# # Conditional
#
# ```{eval-rst}
# .. tags:: Intermediate
# ```
#
# Flytekit elevates conditions to a first-class construct named `conditional`, providing a powerful mechanism for selectively
# executing branches in a workflow. Conditions leverage static or dynamic data generated by tasks or
# received as workflow inputs. While conditions are highly performant in their evaluation,
# it's important to note that they are restricted to specific binary and logical operators
# and are applicable only to primitive values.
#
# To begin, import the necessary libraries.
# %%
import random

from flytekit import conditional, task, workflow


# %% [markdown]
# ## Simple branch
# Simple branch
#
# In this example, we introduce two tasks, `calculate_circle_circumference` and
# `calculate_circle_area`. The workflow dynamically chooses between these tasks based on whether the input
# falls within the fraction range (0-1) or not.
# %%
@task
def calculate_circle_circumference(radius: float) -> float:
return 2 * 3.14 * radius # Task to calculate the circumference of a circle
Expand Down Expand Up @@ -56,13 +37,11 @@ def shape_properties(radius: float) -> float:
print(f"Area of circle (radius={radius_large}): {shape_properties(radius=radius_large)}")


# %% [markdown]
# ## Multiple branches
# Multiple branches
#
# We establish an `if` condition with multiple branches, which will result in a failure if none of the conditions is met.
# It's important to note that any `conditional` statement in Flyte is expected to be complete,
# meaning that all possible branches must be accounted for.
# %%
@workflow
def shape_properties_with_multiple_branches(radius: float) -> float:
return (
Expand All @@ -76,17 +55,14 @@ def shape_properties_with_multiple_branches(radius: float) -> float:
)


# %% [markdown]
# :::{note}
# Take note of the usage of bitwise operators (`&`). Due to Python's PEP-335,
# the logical `and`, `or` and `not` operators cannot be overloaded.
# Flytekit employs bitwise `&` and `|` as equivalents for logical `and` and `or` operators,
# a convention also observed in other libraries.
# :::
#
# ## Consuming the output of a conditional
# Consuming the output of a conditional
#
# Here, we write a task that consumes the output returned by a `conditional`.
# %%
@workflow
def shape_properties_accept_conditional_output(radius: float) -> float:
result = (
Expand All @@ -105,13 +81,11 @@ def shape_properties_accept_conditional_output(radius: float) -> float:
print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}")


# %% [markdown]
# ## Using the output of a previous task in a conditional
# Using the output of a previous task in a conditional
#
# You can check if a boolean returned from the previous task is `True`,
# but unary operations are not supported directly. Instead, use the `is_true`,
# `is_false` and `is_none` methods on the result.
# %%
@task
def coin_toss(seed: int) -> bool:
"""
Expand Down Expand Up @@ -145,29 +119,13 @@ def boolean_wf(seed: int = 5) -> int:
return conditional("coin_toss").if_(result.is_true()).then(success()).else_().then(failed())


# %% [markdown]
# :::{note}
# *How do output values acquire these methods?* In a workflow, direct access to outputs is not permitted.
# Inputs and outputs are automatically encapsulated in a special object known as {py:class}`flytekit.extend.Promise`.
# :::
#
# ## Using boolean workflow inputs in a conditional
# You can directly pass a boolean to a workflow.
# %%
# Using boolean workflow inputs in a conditional
@workflow
def boolean_input_wf(boolean_input: bool) -> int:
return conditional("boolean_input_conditional").if_(boolean_input.is_true()).then(success()).else_().then(failed())


# %% [markdown]
# :::{note}
# Observe that the passed boolean possesses a method called `is_true`.
# This boolean resides within the workflow context and is encapsulated in a specialized Flytekit object.
# This special object enables it to exhibit additional behavior.
# :::
#
# You can run the workflows locally as follows:
# %%
# Run the workflow locally
if __name__ == "__main__":
print("Running boolean_wf a few times...")
for index in range(0, 5):
Expand All @@ -177,12 +135,10 @@ def boolean_input_wf(boolean_input: bool) -> int:
)


# %% [markdown]
# ## Nested conditionals
# Nested conditionals
#
# You can nest conditional sections arbitrarily inside other conditional sections.
# However, these nested sections can only be in the `then` part of a `conditional` block.
# %%
@workflow
def nested_conditions(radius: float) -> float:
return (
Expand All @@ -208,12 +164,7 @@ def nested_conditions(radius: float) -> float:
print(f"nested_conditions(0.4): {nested_conditions(radius=0.4)}")


# %% [markdown]
# ## Using the output of a task in a conditional
#
# Let's write a fun workflow that triggers the `calculate_circle_circumference` task in the event of a "heads" outcome,
# and alternatively, runs the `calculate_circle_area` task in the event of a "tail" outcome.
# %%
# Using the output of a task in a conditional
@workflow
def consume_task_output(radius: float, seed: int = 5) -> float:
is_heads = coin_toss(seed=seed)
Expand All @@ -226,9 +177,7 @@ def consume_task_output(radius: float, seed: int = 5) -> float:
)


# %% [markdown]
# You can run the workflow locally as follows:
# %%
# Run the workflow locally
if __name__ == "__main__":
default_seed_output = consume_task_output(radius=0.4)
print(
Expand All @@ -237,50 +186,3 @@ def consume_task_output(radius: float, seed: int = 5) -> float:

custom_seed_output = consume_task_output(radius=0.4, seed=7)
print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}")

# %% [markdown]
# ## Run the example on the Flyte cluster
#
# To run the provided workflows on the Flyte cluster, use the following commands:
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# shape_properties --radius 3.0
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# shape_properties_with_multiple_branches --radius 11.0
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# shape_properties_accept_conditional_output --radius 0.5
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# boolean_wf
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# boolean_input_wf --boolean_input
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# nested_conditions --radius 0.7
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# consume_task_output --radius 0.4 --seed 7
# ```
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
from functools import partial, wraps

from flytekit import task, workflow

# Create a logger to monitor the execution's progress.
logger = logging.getLogger(__file__)


# Using a single decorator
def log_io(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
logger.info(f"task {fn.__name__} called with args: {args}, kwargs: {kwargs}")
out = fn(*args, **kwargs)
logger.info(f"task {fn.__name__} output: {out}")
return out

return wrapper


# Create a task named `t1` that is decorated with `log_io`.
@task
@log_io
def t1(x: int) -> int:
return x + 1


# Stacking multiple decorators
def validate_output(fn=None, *, floor=0):
@wraps(fn)
def wrapper(*args, **kwargs):
out = fn(*args, **kwargs)
if out <= floor:
raise ValueError(f"output of task {fn.__name__} must be a positive number, found {out}")
return out

if fn is None:
return partial(validate_output, floor=floor)

return wrapper


# Define a function that uses both the logging and validator decorators
@task
@log_io
@validate_output(floor=10)
def t2(x: int) -> int:
return x + 10


# Compose a workflow that calls `t1` and `t2`
@workflow
def decorating_task_wf(x: int) -> int:
return t2(x=t1(x=x))


if __name__ == "__main__":
print(f"Running decorating_task_wf(x=10) {decorating_task_wf(x=10)}")
Loading
Loading