Skip to content

Commit

Permalink
General Partial support in flytekit and multi-list support in flytekit (
Browse files Browse the repository at this point in the history
#1556)

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored and eapolinario committed May 16, 2023
1 parent 19a0367 commit 9a31a01
Show file tree
Hide file tree
Showing 6 changed files with 452 additions and 62 deletions.
11 changes: 3 additions & 8 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import click as _click
from flyteidl.core import literals_pb2 as _literals_pb2

from flytekit import PythonFunctionTask
from flytekit.configuration import (
SERIALIZED_CONTEXT_ENV_VAR,
FastSerializationSettings,
Expand All @@ -23,7 +22,7 @@
from flytekit.core.checkpointer import SyncCheckpoint
from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.map_task import MapPythonTask
from flytekit.core.map_task import MapTaskResolver
from flytekit.core.promise import VoidPromise
from flytekit.exceptions import scopes as _scoped_exceptions
from flytekit.exceptions import scopes as _scopes
Expand Down Expand Up @@ -391,12 +390,8 @@ def _execute_map_task(
with setup_execution(
raw_output_data_prefix, checkpoint_path, prev_checkpoint, dynamic_addl_distro, dynamic_dest_dir
) as ctx:
resolver_obj = load_object_from_module(resolver)
# Use the resolver to load the actual task object
_task_def = resolver_obj.load_task(loader_args=resolver_args)
if not isinstance(_task_def, PythonFunctionTask):
raise Exception("Map tasks cannot be run with instance tasks.")
map_task = MapPythonTask(_task_def, max_concurrency)
mtr = MapTaskResolver()
map_task = mtr.load_task(loader_args=resolver_args, max_concurrency=max_concurrency)

task_index = _compute_array_job_index()
output_prefix = os.path.join(output_prefix, str(task_index))
Expand Down
52 changes: 45 additions & 7 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@
T = typing.TypeVar("T")


def repr_kv(k: str, v: Union[Type, Tuple[Type, Any]]) -> str:
if isinstance(v, tuple):
if v[1]:
return f"{k}: {v[0]}={v[1]}"
return f"{k}: {v[0]}"
return f"{k}: {v}"


def repr_type_signature(io: Union[Dict[str, Tuple[Type, Any]], Dict[str, Type]]) -> str:
"""
Converts an inputs and outputs to a type signature
"""
s = "("
i = 0
for k, v in io.items():
if i > 0:
s += ", "
s += repr_kv(k, v)
i = i + 1
return s + ")"


class Interface(object):
"""
A Python native interface object, like inspect.signature but simpler.
Expand Down Expand Up @@ -57,7 +79,9 @@ def __init__(
variables = [k for k in outputs.keys()]

# TODO: This class is a duplicate of the one in create_task_outputs. Over time, we should move to this one.
class Output(collections.namedtuple(output_tuple_name or "DefaultNamedTupleOutput", variables)):
class Output( # type: ignore
collections.namedtuple(output_tuple_name or "DefaultNamedTupleOutput", variables) # type: ignore
): # type: ignore
"""
This class can be used in two different places. For multivariate-return entities this class is used
to rewrap the outputs so that our with_overrides function can work.
Expand Down Expand Up @@ -167,6 +191,12 @@ def with_outputs(self, extra_outputs: Dict[str, Type]) -> Interface:
new_outputs[k] = v
return Interface(self._inputs, new_outputs)

def __str__(self):
return f"{repr_type_signature(self._inputs)} -> {repr_type_signature(self._outputs)}"

def __repr__(self):
return str(self)


def transform_inputs_to_parameters(
ctx: context_manager.FlyteContext, interface: Interface
Expand Down Expand Up @@ -220,7 +250,7 @@ def transform_interface_to_typed_interface(
return _interface_models.TypedInterface(inputs_map, outputs_map)


def transform_types_to_list_of_type(m: Dict[str, type]) -> Dict[str, type]:
def transform_types_to_list_of_type(m: Dict[str, type], bound_inputs: typing.Set[str]) -> Dict[str, type]:
"""
Converts a given variables to be collections of their type. This is useful for array jobs / map style code.
It will create a collection of types even if any one these types is not a collection type
Expand All @@ -230,6 +260,10 @@ def transform_types_to_list_of_type(m: Dict[str, type]) -> Dict[str, type]:

all_types_are_collection = True
for k, v in m.items():
if k in bound_inputs:
# Skip the inputs that are bound. If they are bound, it does not matter if they are collection or
# singletons
continue
v_type = type(v)
if v_type != typing.List and v_type != list:
all_types_are_collection = False
Expand All @@ -240,17 +274,22 @@ def transform_types_to_list_of_type(m: Dict[str, type]) -> Dict[str, type]:

om = {}
for k, v in m.items():
om[k] = typing.List[v]
if k in bound_inputs:
om[k] = v
else:
om[k] = typing.List[v] # type: ignore
return om # type: ignore


def transform_interface_to_list_interface(interface: Interface) -> Interface:
def transform_interface_to_list_interface(interface: Interface, bound_inputs: typing.Set[str]) -> Interface:
"""
Takes a single task interface and interpolates it to an array interface - to allow performing distributed python map
like functions
:param interface: Interface to be upgraded toa list interface
:param bound_inputs: fixed inputs that should not upgraded to a list and will be maintained as scalars.
"""
map_inputs = transform_types_to_list_of_type(interface.inputs)
map_outputs = transform_types_to_list_of_type(interface.outputs)
map_inputs = transform_types_to_list_of_type(interface.inputs, bound_inputs)
map_outputs = transform_types_to_list_of_type(interface.outputs, set())

return Interface(inputs=map_inputs, outputs=map_outputs)

Expand Down Expand Up @@ -286,7 +325,6 @@ def transform_function_to_interface(fn: typing.Callable, docstring: Optional[Doc
For now the fancy object, maybe in the future a dumb object.
"""

type_hints = get_type_hints(fn, include_extras=True)
signature = inspect.signature(fn)
return_annotation = type_hints.get("return", None)
Expand Down
Loading

0 comments on commit 9a31a01

Please sign in to comment.