Skip to content

Commit

Permalink
clean up development lifecycle files
Browse files Browse the repository at this point in the history
Signed-off-by: nikki everett <[email protected]>
  • Loading branch information
nikki everett committed Apr 17, 2024
1 parent 87c1642 commit e1c4c7f
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 461 deletions.
50 changes: 11 additions & 39 deletions docs/user_guide/development_lifecycle/cache_serializing.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
---
jupytext:
cell_metadata_filter: all
formats: md:myst
main_language: python
notebook_metadata_filter: all
text_representation:
extension: .md
format_name: myst
format_version: 0.13
jupytext_version: 1.16.1
kernelspec:
display_name: Python 3
language: python
name: python3
---

# Cache serializing

```{eval-rst}
Expand All @@ -28,47 +11,36 @@ Ensuring serialized evaluation requires a small degree of overhead to coordinate
- Periodically scheduled workflow where a single task evaluation duration may span multiple scheduled executions.
- Running a commonly shared task within different workflows (which receive the same inputs).

+++ {"lines_to_next_cell": 0}
```{note}
To clone and run the example code on this page, see the [Flytesnacks repo][flytesnacks].
```

For any {py:func}`flytekit.task` in Flyte, there is always one required import, which is:

```{code-cell}
from flytekit import task
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache_serialize.py
:caption: development_lifecycle/task_cache_serialize.py
:lines: 1
```

+++ {"lines_to_next_cell": 0}

Task cache serializing is disabled by default to avoid unexpected behavior for task executions. To enable use the `cache_serialize` parameter.
`cache_serialize` is a switch to enable or disable serialization of the task
This operation is only useful for cacheable tasks, where one may reuse output from a previous execution. Flyte requires implicitly enabling the `cache` parameter on all cache serializable tasks.
Cache key definitions follow the same rules as non-serialized cache tasks. It is important to understand the implications of the task signature and `cache_version` parameter in defining cached results.

```{code-cell}
:lines_to_next_cell: 2
@task(cache=True, cache_serialize=True, cache_version="1.0")
def square(n: int) -> int:
"""
Parameters:
n (int): name of the parameter for the task will be derived from the name of the input variable.
The type will be automatically deduced to Types.Integer
Return:
int: The label for the output will be automatically assigned, and the type will be deduced from the annotation
"""
return n * n
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache_serialize.py
:caption: development_lifecycle/task_cache_serialize.py
:pyobject: square
```

In the above example calling `square(n=2)` multiple times concurrently (even in different executions or workflows) will only execute the multiplication operation once.
Concurrently evaluated tasks will wait for completion of the first instance before reusing the cached results and subsequent evaluations will instantly reuse existing cache results.

+++

## How does serializing caches work?

The cache serialize paradigm introduces a new artifact reservation system. Tasks may use this reservation system to acquire an artifact reservation, indicating that they are actively evaluating the task, and release the reservation, once the execution is completed. Flyte uses a clock-skew algorithm to define reservation timeouts. Therefore, tasks are required to periodically extend the reservation during execution.

The first execution of a serializable cached task will successfully acquire the artifact reservation. Execution will be performed as usual and upon completion, the results are written to the cache and reservation is released. Concurrently executed task instances (i.e. in parallel with the initial execution) will observe an active reservation, in which case the execution will wait until the next reevaluation and perform another check. Once the initial execution completes it will reuse the cached results. Subsequently executed task instances (i.e. after an execution has already completed successfully) will immediately reuse the existing cached results.

Flyte handles task execution failures using a timeout on the reservation. If the task currently holding the reservation fails to extend it before it times out, another task may acquire the reservation and begin executing the task.

[flytesnacks]: https://github.com/flyteorg/flytesnacks/tree/master/examples/development_lifecycle/
136 changes: 28 additions & 108 deletions docs/user_guide/development_lifecycle/caching.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
---
jupytext:
cell_metadata_filter: all
formats: md:myst
main_language: python
notebook_metadata_filter: all
text_representation:
extension: .md
format_name: myst
format_version: 0.13
jupytext_version: 1.16.1
kernelspec:
display_name: Python 3
language: python
name: python3
---

# Caching

```{eval-rst}
Expand All @@ -29,33 +12,29 @@ Task caching is useful when a user knows that many executions with the same inpu
- Running the code multiple times when debugging workflows
- Running the commonly shared tasks amongst different workflows, which receive the same inputs

Let's watch a brief explanation of caching and a demo in this video, followed by how task caching can be enabled .
Let's watch a brief explanation of caching and a demo in this video, followed by how task caching can be enabled.

```{eval-rst}
.. youtube:: WNkThCp-gqo
```

+++ {"lines_to_next_cell": 0}

Import the necessary libraries.
```{note}
To clone and run the example code on this page, see the [Flytesnacks repo][flytesnacks].
```

```{code-cell}
import time
Import the necessary libraries:

import pandas
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache.py
:caption: development_lifecycle/task_cache.py
:lines: 1-3
```

+++ {"lines_to_next_cell": 0}

For any {py:func}`flytekit.task` in Flyte, there is always one required import, which is:

```{code-cell}
:lines_to_next_cell: 1
from flytekit import HashMethod, task, workflow
from flytekit.core.node_creation import create_node
from typing_extensions import Annotated
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache.py
:caption: development_lifecycle/task_cache.py
:lines: 8-10
```

Task caching is disabled by default to avoid unintended consequences of caching tasks with side effects. To enable caching and control its behavior, use the `cache` and `cache_version` parameters when constructing a task.
Expand All @@ -64,26 +43,14 @@ Task caching is disabled by default to avoid unintended consequences of caching
Bumping the `cache_version` is akin to invalidating the cache.
You can manually update this version and Flyte caches the next execution instead of relying on the old cache.

```{code-cell}
@task(cache=True, cache_version="1.0") # noqa: F841
def square(n: int) -> int:
"""
Parameters:
n (int): name of the parameter for the task will be derived from the name of the input variable.
The type will be automatically deduced to ``Types.Integer``.
Return:
int: The label for the output will be automatically assigned, and the type will be deduced from the annotation.
"""
return n * n
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache.py
:caption: development_lifecycle/task_cache.py
:pyobject: square
```

In the above example, calling `square(n=2)` twice (even if it's across different executions or different workflows) will only execute the multiplication operation once.
The next time, the output will be made available immediately since it is captured from the previous execution with the same inputs.

+++

If in a subsequent code update, you update the signature of the task to return the original number along with the result, it'll automatically invalidate the cache (even though the cache version remains the same).

```python
Expand All @@ -92,8 +59,6 @@ def square(n: int) -> Tuple[int, int]:
...
```

+++

:::{note}
If the user changes the task interface in any way (such as adding, removing, or editing inputs/outputs), Flyte treats that as a task functionality change. In the subsequent execution, Flyte runs the task and stores the outputs as newly cached values.
:::
Expand Down Expand Up @@ -137,30 +102,21 @@ The format used by the store is opaque and not meant to be inspectable.

The default behavior displayed by Flyte's memoization feature might not match the user intuition. For example, this code makes use of pandas dataframes:

```{code-cell}
@task
def foo(a: int, b: str) -> pandas.DataFrame:
df = pandas.DataFrame(...)
...
return df
@task(cache=True, cache_version="1.0")
def bar(df: pandas.DataFrame) -> int:
...
@workflow
def wf(a: int, b: str):
df = foo(a=a, b=b)
v = bar(df=df) # noqa: F841
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache.py
:caption: development_lifecycle/task_cache.py
:lines: 39-54
```

If run twice with the same inputs, one would expect that `bar` would trigger a cache hit, but it turns out that's not the case because of how dataframes are represented in Flyte.
However, with release 1.2.0, Flyte provides a new way to control memoization behavior of literals. This is done via a `typing.Annotated` call on the task signature.
For example, in order to cache the result of calls to `bar`, you can rewrite the code above like this:

```{code-cell}
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache.py
:caption: development_lifecycle/task_cache.py
:lines:
```

```python
def hash_pandas_dataframe(df: pandas.DataFrame) -> str:
return str(pandas.util.hash_pandas_object(df))

Expand Down Expand Up @@ -194,47 +150,11 @@ This is done by turning the literal representation into a string and using that

This feature also works in local execution.

+++

Here's a complete example of the feature:

```{code-cell}
def hash_pandas_dataframe(df: pandas.DataFrame) -> str:
return str(pandas.util.hash_pandas_object(df))
@task
def uncached_data_reading_task() -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]:
return pandas.DataFrame({"column_1": [1, 2, 3]})
@task(cache=True, cache_version="1.0")
def cached_data_processing_task(df: pandas.DataFrame) -> pandas.DataFrame:
time.sleep(1)
return df * 2
@task
def compare_dataframes(df1: pandas.DataFrame, df2: pandas.DataFrame):
assert df1.equals(df2)
@workflow
def cached_dataframe_wf():
raw_data = uncached_data_reading_task()
# Execute `cached_data_processing_task` twice, but force those
# two executions to happen serially to demonstrate how the second run
# hits the cache.
t1_node = create_node(cached_data_processing_task, df=raw_data)
t2_node = create_node(cached_data_processing_task, df=raw_data)
t1_node >> t2_node
# Confirm that the dataframes actually match
compare_dataframes(df1=t1_node.o0, df2=t2_node.o0)
if __name__ == "__main__":
df1 = cached_dataframe_wf()
print(f"Running cached_dataframe_wf once : {df1}")
```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/development_lifecycle/development_lifecycle/task_cache.py
:caption: development_lifecycle/task_cache.py
:lines: 97-134
```

[flytesnacks]: https://github.com/flyteorg/flytesnacks/tree/master/examples/development_lifecycle/
17 changes: 0 additions & 17 deletions docs/user_guide/development_lifecycle/creating_a_new_project.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
---
jupytext:
cell_metadata_filter: all
formats: md:myst
main_language: python
notebook_metadata_filter: all
text_representation:
extension: .md
format_name: myst
format_version: 0.13
jupytext_version: 1.16.1
kernelspec:
display_name: Python 3
language: python
name: python3
---

# Creating a new project

Creates project to be used as a home for the flyte resources of tasks and workflows.
Expand Down
17 changes: 0 additions & 17 deletions docs/user_guide/development_lifecycle/debugging_executions.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
---
jupytext:
cell_metadata_filter: all
formats: md:myst
main_language: python
notebook_metadata_filter: all
text_representation:
extension: .md
format_name: myst
format_version: 0.13
jupytext_version: 1.16.1
kernelspec:
display_name: Python 3
language: python
name: python3
---

# Debugging executions

The inspection of task and workflow execution would provide you log links to debug things further
Expand Down
Loading

0 comments on commit e1c4c7f

Please sign in to comment.