Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async tasks and eager revamp #2927

Merged
merged 48 commits into from
Dec 9, 2024
Merged

Async tasks and eager revamp #2927

merged 48 commits into from
Dec 9, 2024

Conversation

wild-endeavor
Copy link
Contributor

@wild-endeavor wild-endeavor commented Nov 13, 2024

To support the growing interest to support Python async style programming, and to bring the eager mode out of experimental, this pr revamps the call patters in eager to support native Python async semantics. This PR also introduces the concept of async tasks. Note that while documentation refers to eager entities as "workflows", using the @eager decorator produces a task, not a workflow, so they'll continue to show up under the Tasks tab of the Flyte UI.

This is a backwards incompatible change for @eager.

Usage

Below are some examples of how you might call the new eager workflow.

Patterns

Simple Example

This is the simplest case. An eager workflow invokes a simple task. The container running the eager function will reach out to the control plane, and kick off a single-task execution.

@task
def add_one(x: int) -> int:  
    return x + 1  
  
  
@eager
async def simple_eager_workflow(x: int) -> int:  
    # This is the normal way of calling tasks. Call normal tasks in an effectively async way by hanging and waiting for  
    # the result.    out = add_one(x=x)  
  
    return out

This is akin to an async function in Python calling a synchronous function. It will automatically block until the results are available.

Controlling Execution

This example shows how you might do work at the same time as a task kicked off by eager is running. Again this follows the same semantics as Python. In Python the executing function also needs to relinquish control of the CPU by calling an await so that other items on the loop can progress.

@eager
async def simple_await() -> int:
    t1 = asyncio.create_task(add_one(x=10))

	# This allows the loop to run in the background, actually kicking
	# off the execution
    await asyncio.sleep(0)
    # <can do more CPU intensive things here while>
  
    # don't forget the comma if just awaiting one
    i1, = await asyncio.gather(t1)  # can have more of course
    return i1

Nested Example

Eager workflows can also be nested. If an eager workflow encounters another eager workflow, it will be launched against Admin as a single task execution, just like any other task.

@task
def add_one(x: int) -> int:
    return x + 1

@task
def double(x: int) -> int:  
    return x * 2  

@dynamic
def level_3_dt(x: int) -> int:
    out = add_one(x=x)
    return out

@eager
async def level_2(x: int) -> int:
    out = add_one(x=x)
    level_3_res = level_3_dt(x=out)
    final_res = double(x=level_3_res)
    return final_res

@eager
async def level_1() -> typing.Tuple[int, int]:
    i1 = add_one(x=10)
    t2 = asyncio.create_task(level_2(x=1))

    i2 = await t2
    return i1, i2

Errors

If an eager task runs a task on a remote Flyte cluster and that task fails, there is no way to recreate the exact type (AssertionError, ValueError, etc.) that caused the failure, so all failures are interpreted as an EagerException. This behavior is the same as before, but the import location has changed

from flytekit import EagerException

@eager  
async def base_wf(x: int) -> int:  
	try:
	    out = add_one(x=x)
	except EagerException as ee:
	    out = add_two(x=x)

Developer/Admin Notes

Naming/Searching

When an eager task launches downstream entities the execution names are deterministic. A hash is made from the current eager task's execution ID, the entity type and name being run, the call order (if a task is called multiple times) and the inputs. This makes the execution idempotent for future recovery work.

Labels

Two labels are attached to executions launched by an eager execution, the current eager execution's name (under eager-exec), and the root eager execution's name (under root-eager-exec in the case of nested eager tasks).

Signal Handling

A signal handler gets added when an eager task runs in the backend, and listens to sigint (ctrl-c) and sigterm (the signal that is sent by K8s when the pod is deleted, aka kill). The handler will iterate through all the executions and terminate anything that's not already in a terminal state.

Changes

High level changes

  • Addition of a new AsyncPythonFunctionTask and EagerAsyncPythonFunctionTask.
  • Introduced two new modes, one for eager local execution, and one for eager backend execution. This made more sense than adding an eager execution mode like how dynamic is done.
  • Added an async version of the call handler - the main flyte_entity_call_handler and this new async call handler now call each other recursively.
  • Updated main call handler to now allow the calling of other flyte entities inside an eager task.
  • Introduced two new Execution modes, one for local eager and one for eager backend run.
  • Add as_python_native to the LiteralsResolver so it can produce proper un-packable outputs as the result of calling flyte entities.
  • Adding two classes for the watching of executions launched by an eager task - Controller and Informer (not set on the names, these are not user facing so we can change later)

Note

A note for developers, originally we were only going to have the async version of the call handler, but this failed because of the way flytekit/Python async works. Because we cannot guarantee that we'll only ever make the jump from async to sync once, async functions are run on a separate thread in flytekit. We ran into issues where plugins and other libraries depended on being run in the main Python thread (one example is signal handling for instance). The synchronous version of the call handler was added back for this reason, to keep the non-eager, non-async call flow effectively the same as before.

Other items

  • For agents, change signal handling by redirecting it through the context manager for now, but more work could be done here to make it more rigorous.
  • Added an exception handler to the main asyn.py loop_manager to log errors so they're more visible.
  • Updated idl requirement to pick up new is_eager bit.

Remaining items

These probably make more sense to do after this is merged. Eager should be considered not completely usable until these are in.

  • Introduce async workflows that compile the same way as normal workflows, but are callable within other async workflows, and can call eager tasks directly.
  • Documentation updates outside of code comments

Investigate

Things that came up in the course of this PR.

  • Not related to eager but the naming of entities changed drastically if i was two folders up from the code rather than one.

How was this patch tested?

Tested locally using sandbox. More testing needed in deployed environments.

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Copy link

codecov bot commented Nov 14, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 90.05%. Comparing base (47fe660) to head (802bae1).
Report is 6 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2927       +/-   ##
===========================================
+ Coverage   51.35%   90.05%   +38.70%     
===========================================
  Files         200       85      -115     
  Lines       20940     3820    -17120     
  Branches     2697        0     -2697     
===========================================
- Hits        10753     3440     -7313     
+ Misses       9586      380     -9206     
+ Partials      601        0      -601     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@wild-endeavor wild-endeavor changed the title Async/tasks wip Async/tasks Nov 14, 2024
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
deck and exceptions

Signed-off-by: Yee Hing Tong <[email protected]>
… the launch function, pass the work item directly so that exceptions can always be set, unit tests

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Kevin Su <[email protected]>


@pytest.mark.sandbox_test
def test_easy_2():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should test 2 concurrent eager executions running in the same python vm but are independent

For example
exec-1 -> eager
exec-2 -> eager -> eager

Logically these are independent executions

Copy link
Contributor Author

@wild-endeavor wild-endeavor Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me make an issue for this, and some of the other todos that came up. will have to overhaul the flyte context & manager. I think the scenario you're thinking of can be described like this.

@task
def t1():
    print("normal task")

@eager
def eg_1():
    some_other_task()

@eager
def eg_p():
    t1()
    eg_1()

The issue is that the FlyteContext is a shared global across all coroutines (it's a thread local, not a coroutine local) which means that when t1's call handler runs, it'll set the execution state one way, and when eg_1's call handler runs it'll try to set it another way (actually it'll fail because it'll think it's running inside t1 rather than eg_p). What we want is a sort of tree of context objects rather than the list it is today.

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
@wild-endeavor wild-endeavor changed the title wip Async/tasks Async tasks and eager revamp Dec 4, 2024
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
eapolinario
eapolinario previously approved these changes Dec 9, 2024
Copy link
Collaborator

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing.

Signed-off-by: Yee Hing Tong <[email protected]>
@wild-endeavor wild-endeavor merged commit 276c464 into master Dec 9, 2024
100 of 102 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants