Skip to content

Commit

Permalink
Async/tasks agent (#2955)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
wild-endeavor authored Nov 26, 2024
1 parent 65df34a commit af631bc
Show file tree
Hide file tree
Showing 15 changed files with 231 additions and 185 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@
from flytekit.core.reference_entity import LaunchPlanReference, TaskReference, WorkflowReference
from flytekit.core.resources import Resources
from flytekit.core.schedule import CronSchedule, FixedRate
from flytekit.core.task import Secret, reference_task, task
from flytekit.core.task import Secret, eager, reference_task, task
from flytekit.core.type_engine import BatchSize
from flytekit.core.workflow import ImperativeWorkflow as Workflow
from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow
Expand Down
15 changes: 15 additions & 0 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging as _logging
import os
import pathlib
import signal
import tempfile
import traceback
import typing
Expand All @@ -24,6 +25,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from types import FrameType
from typing import Generator, List, Optional, Union

from flytekit.configuration import Config, SecretsConfig, SerializationSettings
Expand Down Expand Up @@ -896,6 +898,12 @@ class FlyteContextManager(object):
FlyteContextManager.pop_context()
"""

signal_handlers: typing.List[typing.Callable[[int, FrameType], typing.Any]] = []

@staticmethod
def add_signal_handler(handler: typing.Callable[[int, FrameType], typing.Any]):
FlyteContextManager.signal_handlers.append(handler)

@staticmethod
def get_origin_stackframe(limit=2) -> traceback.FrameSummary:
ss = traceback.extract_stack(limit=limit + 1)
Expand Down Expand Up @@ -979,6 +987,13 @@ def initialize():
user_space_path = os.path.join(cfg.local_sandbox_path, "user_space")
pathlib.Path(user_space_path).mkdir(parents=True, exist_ok=True)

def main_signal_handler(signum: int, frame: FrameType):
for handler in FlyteContextManager.signal_handlers:
handler(signum, frame)
exit(1)

signal.signal(signal.SIGINT, main_signal_handler)

# Note we use the SdkWorkflowExecution object purely for formatting into the ex:project:domain:name format users
# are already acquainted with
default_context = FlyteContext(file_access=default_local_file_access_provider)
Expand Down
5 changes: 2 additions & 3 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ def execute(self, **kwargs) -> Any:
handle dynamic tasks or you will no longer be able to use the task as a dynamic task generator.
"""
if self.execution_mode == self.ExecutionBehavior.DEFAULT:
# todo:async run task function in a runner if necessary.
return self._task_function(**kwargs)
elif self.execution_mode == self.ExecutionBehavior.DYNAMIC:
return self.dynamic_execute(self._task_function, **kwargs)
Expand Down Expand Up @@ -425,7 +424,6 @@ async def async_execute(self, *args, **kwargs) -> Any:
# Args is present because the asyn helper function passes it, but everything should be in kwargs by this point
assert not args
if self.execution_mode == self.ExecutionBehavior.DEFAULT:
# todo:async run task function in a runner if necessary.
return await self._task_function(**kwargs)
elif self.execution_mode == self.ExecutionBehavior.DYNAMIC:
raise NotImplementedError
Expand Down Expand Up @@ -618,7 +616,8 @@ async def run_with_backend(self, **kwargs):


"""
merge master again
match signal handler pattern and re-test
local execution
pure watch informer pattern
priority for flytekit - fix naming, depending on src
Expand Down
122 changes: 120 additions & 2 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import inspect
import os
from functools import update_wrapper
from functools import partial, update_wrapper
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union, overload

from typing_extensions import ParamSpec # type: ignore
Expand All @@ -13,7 +13,7 @@
from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin
from flytekit.core.interface import Interface, output_name_generator, transform_function_to_interface
from flytekit.core.pod_template import PodTemplate
from flytekit.core.python_function_task import AsyncPythonFunctionTask, PythonFunctionTask
from flytekit.core.python_function_task import AsyncPythonFunctionTask, EagerAsyncPythonFunctionTask, PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
from flytekit.core.utils import str2bool
Expand Down Expand Up @@ -504,3 +504,121 @@ def execute(self, **kwargs) -> Any:
return values[0]
else:
return tuple(values)


def eager(
_fn=None,
*args,
**kwargs,
) -> Union[EagerAsyncPythonFunctionTask, partial]:
"""Eager workflow decorator.
:param remote: A :py:class:`~flytekit.remote.FlyteRemote` object to use for executing Flyte entities.
:param client_secret_group: The client secret group to use for this workflow.
:param client_secret_key: The client secret key to use for this workflow.
:param timeout: The timeout duration specifying how long to wait for a task/workflow execution within the eager
workflow to complete or terminate. By default, the eager workflow will wait indefinitely until complete.
:param poll_interval: The poll interval for checking if a task/workflow execution within the eager workflow has
finished. If not specified, the default poll interval is 6 seconds.
:param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided
:py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local
testing against a remote Flyte cluster.
:param client_secret_env_var: if specified, binds the client secret to the specified environment variable for
remote authentication.
:param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`.
This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be
used inside of an ``@eager``-decorated function. This is because eager workflows use a
:py:class:`~flytekit.remote.remote.FlyteRemote` object to kick off executions when a flyte entity needs to produce a
value.
For example:
.. code-block:: python
from flytekit import task
from flytekit.experimental import eager
@task
def add_one(x: int) -> int:
return x + 1
@task
def double(x: int) -> int:
return x * 2
@eager
async def eager_workflow(x: int) -> int:
out = await add_one(x=x)
return await double(x=out)
# run locally with asyncio
if __name__ == "__main__":
import asyncio
result = asyncio.run(eager_workflow(x=1))
print(f"Result: {result}") # "Result: 4"
Unlike :py:func:`dynamic workflows <flytekit.dynamic>`, eager workflows are not compiled into a workflow spec, but
uses python's `async <https://docs.python.org/3/library/asyncio.html>`__ capabilities to execute flyte entities.
.. note::
Eager workflows only support `@task`, `@workflow`, and `@eager` entities. Dynamic workflows and launchplans are
currently not supported.
Note that for the ``@eager`` function is an ``async`` function. Under the hood, tasks and workflows called inside
an ``@eager`` workflow are executed asynchronously. This means that task and workflow calls will return an awaitable,
which need to be awaited.
.. important::
A ``client_secret_group`` and ``client_secret_key`` is needed for authenticating via
:py:class:`~flytekit.remote.remote.FlyteRemote` using the ``client_credentials`` authentication, which is
configured via :py:class:`~flytekit.configuration.PlatformConfig`.
.. code-block:: python
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
@eager(
remote=FlyteRemote(config=Config.auto(config_file="config.yaml")),
client_secret_group="my_client_secret_group",
client_secret_key="my_client_secret_key",
)
async def eager_workflow(x: int) -> int:
out = await add_one(x)
return await double(one)
Where ``config.yaml`` contains is a flytectl-compatible config file.
For more details, see `here <https://docs.flyte.org/en/latest/flytectl/overview.html#configuration>`__.
When using a sandbox cluster started with ``flytectl demo start``, however, the ``client_secret_group``
and ``client_secret_key`` are not needed, :
.. code-block:: python
@eager(remote=FlyteRemote(config=Config.for_sandbox()))
async def eager_workflow(x: int) -> int:
...
.. important::
When using ``local_entrypoint=True`` you also need to specify the ``remote`` argument. In this case, the eager
workflow runtime will be local, but all task/subworkflow invocations will occur on the specified Flyte cluster.
This argument is primarily used for testing and debugging eager workflow logic locally.
"""

if _fn is None:
return partial(
eager,
**kwargs,
)

if "enable_deck" in kwargs:
del kwargs["enable_deck"]

et = EagerAsyncPythonFunctionTask(task_config=None, task_function=_fn, enable_deck=True, **kwargs)
return et
5 changes: 4 additions & 1 deletion flytekit/core/worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ async def watch_one(self, work: WorkItem):
# Iterate in order so that we add to the interface in the correct order
for i in range(num_outputs):
key = f"o{i}"
# todo:async add a nicer error here
if key not in work.entity.interface.outputs:
raise AssertionError(
f"Output name {key} not found in outputs {[k for k in work.entity.interface.outputs.keys()]}"
)
var_type = work.entity.interface.outputs[key].type
python_outputs_interface[key] = TypeEngine.guess_python_type(var_type)
py_iface = Interface(inputs=typing.cast(dict[str, typing.Type], {}), outputs=python_outputs_interface)
Expand Down
143 changes: 0 additions & 143 deletions flytekit/experimental/eager_function.py
Original file line number Diff line number Diff line change
@@ -1,159 +1,16 @@
import os
from functools import partial
from typing import Optional

from flytekit import current_context
from flytekit.configuration import DataConfig, PlatformConfig, S3Config
from flytekit.core.context_manager import ExecutionState, FlyteContext, FlyteContextManager
from flytekit.core.python_function_task import EagerAsyncPythonFunctionTask
from flytekit.loggers import logger
from flytekit.remote import FlyteRemote

FLYTE_SANDBOX_INTERNAL_ENDPOINT = "flyte-sandbox-grpc.flyte:8089"
FLYTE_SANDBOX_MINIO_ENDPOINT = "http://flyte-sandbox-minio.flyte:9000"


# async def terminate(self):
# execution = self.remote.sync(self._execution)
# logger.debug(f"Cleaning up execution: {execution}")
# if not execution.is_done:
# self.remote.terminate(
# execution,
# f"Execution terminated by eager workflow execution {self.async_stack.parent_execution_id}.",
# )
#
# poll_interval = self._poll_interval or timedelta(seconds=6)
# time_to_give_up = (
# (datetime.max.replace(tzinfo=timezone.utc))
# if self._timeout is None
# else datetime.now(timezone.utc) + self._timeout
# )
#
# while datetime.now(timezone.utc) < time_to_give_up:
# execution = self.remote.sync(execution)
# if execution.is_done:
# break
# await asyncio.sleep(poll_interval.total_seconds())


def eager(
_fn=None,
*args,
**kwargs,
) -> EagerAsyncPythonFunctionTask:
"""Eager workflow decorator.
:param remote: A :py:class:`~flytekit.remote.FlyteRemote` object to use for executing Flyte entities.
:param client_secret_group: The client secret group to use for this workflow.
:param client_secret_key: The client secret key to use for this workflow.
:param timeout: The timeout duration specifying how long to wait for a task/workflow execution within the eager
workflow to complete or terminate. By default, the eager workflow will wait indefinitely until complete.
:param poll_interval: The poll interval for checking if a task/workflow execution within the eager workflow has
finished. If not specified, the default poll interval is 6 seconds.
:param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided
:py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local
testing against a remote Flyte cluster.
:param client_secret_env_var: if specified, binds the client secret to the specified environment variable for
remote authentication.
:param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`.
This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be
used inside of an ``@eager``-decorated function. This is because eager workflows use a
:py:class:`~flytekit.remote.remote.FlyteRemote` object to kick off executions when a flyte entity needs to produce a
value.
For example:
.. code-block:: python
from flytekit import task
from flytekit.experimental import eager
@task
def add_one(x: int) -> int:
return x + 1
@task
def double(x: int) -> int:
return x * 2
@eager
async def eager_workflow(x: int) -> int:
out = await add_one(x=x)
return await double(x=out)
# run locally with asyncio
if __name__ == "__main__":
import asyncio
result = asyncio.run(eager_workflow(x=1))
print(f"Result: {result}") # "Result: 4"
Unlike :py:func:`dynamic workflows <flytekit.dynamic>`, eager workflows are not compiled into a workflow spec, but
uses python's `async <https://docs.python.org/3/library/asyncio.html>`__ capabilities to execute flyte entities.
.. note::
Eager workflows only support `@task`, `@workflow`, and `@eager` entities. Dynamic workflows and launchplans are
currently not supported.
Note that for the ``@eager`` function is an ``async`` function. Under the hood, tasks and workflows called inside
an ``@eager`` workflow are executed asynchronously. This means that task and workflow calls will return an awaitable,
which need to be awaited.
.. important::
A ``client_secret_group`` and ``client_secret_key`` is needed for authenticating via
:py:class:`~flytekit.remote.remote.FlyteRemote` using the ``client_credentials`` authentication, which is
configured via :py:class:`~flytekit.configuration.PlatformConfig`.
.. code-block:: python
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
@eager(
remote=FlyteRemote(config=Config.auto(config_file="config.yaml")),
client_secret_group="my_client_secret_group",
client_secret_key="my_client_secret_key",
)
async def eager_workflow(x: int) -> int:
out = await add_one(x)
return await double(one)
Where ``config.yaml`` contains is a flytectl-compatible config file.
For more details, see `here <https://docs.flyte.org/en/latest/flytectl/overview.html#configuration>`__.
When using a sandbox cluster started with ``flytectl demo start``, however, the ``client_secret_group``
and ``client_secret_key`` are not needed, :
.. code-block:: python
@eager(remote=FlyteRemote(config=Config.for_sandbox()))
async def eager_workflow(x: int) -> int:
...
.. important::
When using ``local_entrypoint=True`` you also need to specify the ``remote`` argument. In this case, the eager
workflow runtime will be local, but all task/subworkflow invocations will occur on the specified Flyte cluster.
This argument is primarily used for testing and debugging eager workflow logic locally.
"""

if _fn is None:
return partial(
eager,
**kwargs,
)

if "enable_deck" in kwargs:
del kwargs["enable_deck"]

et = EagerAsyncPythonFunctionTask(task_config=None, task_function=_fn, enable_deck=True, **kwargs)
return et


def _prepare_remote(
remote: Optional[FlyteRemote],
ctx: FlyteContext,
Expand Down
Loading

0 comments on commit af631bc

Please sign in to comment.