Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into switch-dir-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor committed Nov 28, 2022
2 parents 5c37326 + 7cf5b68 commit 7f3575a
Show file tree
Hide file tree
Showing 19 changed files with 451 additions and 69 deletions.
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ IPython
# Newer versions of torch bring in nvidia dependencies that are not present in windows, so
# we put this constraint while we do not have per-environment requirements files
torch<=1.12.1
scikit-learn
21 changes: 11 additions & 10 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
# via
# -c requirements.txt
# pytest-flyte
appnope==0.1.3
# via ipython
arrow==1.2.3
# via
# -c requirements.txt
Expand Down Expand Up @@ -77,7 +79,6 @@ cryptography==38.0.3
# -c requirements.txt
# paramiko
# pyopenssl
# secretstorage
dataclasses-json==0.5.7
# via
# -c requirements.txt
Expand Down Expand Up @@ -192,11 +193,6 @@ jaraco-classes==3.2.3
# keyring
jedi==0.18.1
# via ipython
jeepney==0.8.0
# via
# -c requirements.txt
# keyring
# secretstorage
jinja2==3.1.2
# via
# -c requirements.txt
Expand All @@ -212,6 +208,7 @@ joblib==1.2.0
# -c requirements.txt
# -r dev-requirements.in
# flytekit
# scikit-learn
jsonschema==3.2.0
# via
# -c requirements.txt
Expand Down Expand Up @@ -265,6 +262,8 @@ numpy==1.21.6
# flytekit
# pandas
# pyarrow
# scikit-learn
# scipy
packaging==21.3
# via
# -c requirements.txt
Expand Down Expand Up @@ -419,10 +418,10 @@ retry==0.9.2
# flytekit
rsa==4.9
# via google-auth
secretstorage==3.3.3
# via
# -c requirements.txt
# keyring
scikit-learn==1.0.2
# via -r dev-requirements.in
scipy==1.7.3
# via scikit-learn
singledispatchmethod==1.0
# via
# -c requirements.txt
Expand Down Expand Up @@ -451,6 +450,8 @@ text-unidecode==1.3
# python-slugify
texttable==1.6.4
# via docker-compose
threadpoolctl==3.1.0
# via scikit-learn
toml==0.10.2
# via
# -c requirements.txt
Expand Down
1 change: 1 addition & 0 deletions doc-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ tensorflow==2.9.0 # onnxtensorflow
whylogs # whylogs
whylabs-client # whylogs
ray # ray
scikit-learn # scikit-learn
7 changes: 7 additions & 0 deletions docs/source/extras.sklearn.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
############
Sklearn Type
############
.. automodule:: flytekit.extras.sklearn
:no-members:
:no-inherited-members:
:no-special-members:
1 change: 1 addition & 0 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ def dispatch_execute(

# Short circuit the translation to literal map because what's returned may be a dj spec (or an
# already-constructed LiteralMap if the dynamic task was a no-op), not python native values
# dynamic_execute returns a literal map in local execute so this also gets triggered.
if isinstance(native_outputs, _literal_models.LiteralMap) or isinstance(
native_outputs, _dynamic_job.DynamicJobSpec
):
Expand Down
20 changes: 10 additions & 10 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,20 +450,20 @@ class Mode(Enum):
Defines the possible execution modes, which in turn affects execution behavior.
"""

#: This is the mode that is used when a task execution mimics the actual runtime environment.
#: NOTE: This is important to understand the difference between TASK_EXECUTION and LOCAL_TASK_EXECUTION
#: LOCAL_TASK_EXECUTION, is the mode that is run purely locally and in some cases the difference between local
#: and runtime environment may be different. For example for Dynamic tasks local_task_execution will just run it
#: as a regular function, while task_execution will extract a runtime spec
# This is the mode that is used when a task execution mimics the actual runtime environment.
# NOTE: This is important to understand the difference between TASK_EXECUTION and LOCAL_TASK_EXECUTION
# LOCAL_TASK_EXECUTION, is the mode that is run purely locally and in some cases the difference between local
# and runtime environment may be different. For example for Dynamic tasks local_task_execution will just run it
# as a regular function, while task_execution will extract a runtime spec
TASK_EXECUTION = 1

#: This represents when flytekit is locally running a workflow. The behavior of tasks differs in this case
#: because instead of running a task's user defined function directly, it'll need to wrap the return values in
#: NodeOutput
# This represents when flytekit is locally running a workflow. The behavior of tasks differs in this case
# because instead of running a task's user defined function directly, it'll need to wrap the return values in
# NodeOutput
LOCAL_WORKFLOW_EXECUTION = 2

#: This is the mode that is used to to indicate a purely local task execution - i.e. running without a container
#: or propeller.
# This is the mode that is used to indicate a purely local task execution - i.e. running without a container
# or propeller.
LOCAL_TASK_EXECUTION = 3

mode: Optional[ExecutionState.Mode]
Expand Down
7 changes: 6 additions & 1 deletion flytekit/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ def with_overrides(self, *args, **kwargs):
)
if "interruptible" in kwargs:
self._metadata._interruptible = kwargs["interruptible"]
if "name" in kwargs:
self._metadata._name = kwargs["name"]
return self


Expand All @@ -134,7 +136,10 @@ def _convert_resource_overrides(
)
if resources.ephemeral_storage is not None:
resource_entries.append(
_resources_model.ResourceEntry(_resources_model.ResourceName.EPHEMERAL_STORAGE, resources.ephemeral_storage)
_resources_model.ResourceEntry(
_resources_model.ResourceName.EPHEMERAL_STORAGE,
resources.ephemeral_storage,
)
)

return resource_entries
69 changes: 49 additions & 20 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
from enum import Enum
from typing import Any, Callable, List, Optional, TypeVar, Union

from flytekit.configuration import SerializationSettings
from flytekit.configuration.default_images import DefaultImages
from flytekit.core.base_task import Task, TaskResolverMixin
from flytekit.core.context_manager import ExecutionState, FlyteContext, FlyteContextManager
from flytekit.core.docstring import Docstring
from flytekit.core.interface import transform_function_to_interface
from flytekit.core.promise import VoidPromise, translate_inputs_to_literals
from flytekit.core.python_auto_container import PythonAutoContainerTask, default_task_resolver
from flytekit.core.tracker import extract_task_module, is_functools_wrapped_module_level, isnested, istestfunction
from flytekit.core.workflow import (
Expand All @@ -34,6 +33,7 @@
WorkflowMetadataDefaults,
)
from flytekit.exceptions import scopes as exception_scopes
from flytekit.exceptions.user import FlyteValueException
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -145,6 +145,7 @@ def __init__(
)
self._task_function = task_function
self._execution_mode = execution_mode
self._wf = None # For dynamic tasks

@property
def execution_mode(self) -> ExecutionBehavior:
Expand All @@ -164,6 +165,14 @@ def execute(self, **kwargs) -> Any:
elif self.execution_mode == self.ExecutionBehavior.DYNAMIC:
return self.dynamic_execute(self._task_function, **kwargs)

def _create_and_cache_dynamic_workflow(self):
if self._wf is None:
workflow_meta = WorkflowMetadata(on_failure=WorkflowFailurePolicy.FAIL_IMMEDIATELY)
defaults = WorkflowMetadataDefaults(
interruptible=self.metadata.interruptible if self.metadata.interruptible is not None else False
)
self._wf = PythonFunctionWorkflow(self._task_function, metadata=workflow_meta, default_metadata=defaults)

def compile_into_workflow(
self, ctx: FlyteContext, task_function: Callable, **kwargs
) -> Union[_dynamic_job.DynamicJobSpec, _literal_models.LiteralMap]:
Expand All @@ -183,12 +192,7 @@ def compile_into_workflow(
# TODO: Resolve circular import
from flytekit.tools.translator import get_serializable

workflow_metadata = WorkflowMetadata(on_failure=WorkflowFailurePolicy.FAIL_IMMEDIATELY)
defaults = WorkflowMetadataDefaults(
interruptible=self.metadata.interruptible if self.metadata.interruptible is not None else False
)

self._wf = PythonFunctionWorkflow(task_function, metadata=workflow_metadata, default_metadata=defaults)
self._create_and_cache_dynamic_workflow()
self._wf.compile(**kwargs)

wf = self._wf
Expand Down Expand Up @@ -259,19 +263,44 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any:
representing that newly generated workflow, instead of executing it.
"""
ctx = FlyteContextManager.current_context()
# This is a placeholder SerializationSettings placeholder and is only used to test compilation for dynamic tasks
# when run locally. The output of the compilation should never actually be used anywhere.
_LOCAL_ONLY_SS = SerializationSettings.for_image(DefaultImages.default_image(), "v", "p", "d")

if ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION:
updated_exec_state = ctx.execution_state.with_params(mode=ExecutionState.Mode.TASK_EXECUTION)
with FlyteContextManager.with_context(
ctx.with_execution_state(updated_exec_state).with_serialization_settings(_LOCAL_ONLY_SS)
) as ctx:
logger.debug(f"Running compilation for {self} as part of local run as check")
self.compile_into_workflow(ctx, task_function, **kwargs)
logger.info("Executing Dynamic workflow, using raw inputs")
return exception_scopes.user_entry_point(task_function)(**kwargs)
# The rest of this function mimics the local_execute of the workflow. We can't use the workflow
# local_execute directly though since that converts inputs into Promises.
logger.debug(f"Executing Dynamic workflow, using raw inputs {kwargs}")
self._create_and_cache_dynamic_workflow()
function_outputs = self._wf.execute(**kwargs)

if isinstance(function_outputs, VoidPromise) or function_outputs is None:
return VoidPromise(self.name)

if len(self._wf.python_interface.outputs) == 0:
raise FlyteValueException(function_outputs, "Interface output should've been VoidPromise or None.")

# TODO: This will need to be cleaned up when we revisit top-level tuple support.
expected_output_names = list(self.python_interface.outputs.keys())
if len(expected_output_names) == 1:
# Here we have to handle the fact that the wf could've been declared with a typing.NamedTuple of
# length one. That convention is used for naming outputs - and single-length-NamedTuples are
# particularly troublesome but elegant handling of them is not a high priority
# Again, we're using the output_tuple_name as a proxy.
if self.python_interface.output_tuple_name and isinstance(function_outputs, tuple):
wf_outputs_as_map = {expected_output_names[0]: function_outputs[0]}
else:
wf_outputs_as_map = {expected_output_names[0]: function_outputs}
else:
wf_outputs_as_map = {
expected_output_names[i]: function_outputs[i] for i, _ in enumerate(function_outputs)
}

# In a normal workflow, we'd repackage the promises coming from tasks into new Promises matching the
# workflow's interface. For a dynamic workflow, just return the literal map.
wf_outputs_as_literal_dict = translate_inputs_to_literals(
ctx,
wf_outputs_as_map,
flyte_interface_types=self.interface.outputs,
native_types=self.python_interface.outputs,
)
return _literal_models.LiteralMap(literals=wf_outputs_as_literal_dict)

if ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
return self.compile_into_workflow(ctx, task_function, **kwargs)
Expand Down
26 changes: 26 additions & 0 deletions flytekit/extras/sklearn/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
Flytekit Sklearn
=========================================
.. currentmodule:: flytekit.extras.sklearn
.. autosummary::
:template: custom.rst
:toctree: generated/
"""
from flytekit.loggers import logger

# TODO: abstract this out so that there's an established pattern for registering plugins
# that have soft dependencies
try:
# isolate the exception to the sklearn import
import sklearn

_sklearn_installed = True
except (ImportError, OSError):
_sklearn_installed = False


if _sklearn_installed:
from .native import SklearnEstimatorTransformer
else:
logger.info("We won't register SklearnEstimatorTransformer because scikit-learn is not installed.")
79 changes: 79 additions & 0 deletions flytekit/extras/sklearn/native.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import pathlib
from typing import Generic, Type, TypeVar

import joblib
import sklearn

from flytekit.core.context_manager import FlyteContext
from flytekit.core.type_engine import TypeEngine, TypeTransformer, TypeTransformerFailedError
from flytekit.models.core import types as _core_types
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.types import LiteralType

T = TypeVar("T")


class SklearnTypeTransformer(TypeTransformer, Generic[T]):
def get_literal_type(self, t: Type[T]) -> LiteralType:
return LiteralType(
blob=_core_types.BlobType(
format=self.SKLEARN_FORMAT,
dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE,
)
)

def to_literal(
self,
ctx: FlyteContext,
python_val: T,
python_type: Type[T],
expected: LiteralType,
) -> Literal:
meta = BlobMetadata(
type=_core_types.BlobType(
format=self.SKLEARN_FORMAT,
dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE,
)
)

local_path = ctx.file_access.get_random_local_path() + ".joblib"
pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True)

# save sklearn estimator to a file
joblib.dump(python_val, local_path)

remote_path = ctx.file_access.get_random_remote_path(local_path)
ctx.file_access.put_data(local_path, remote_path, is_multipart=False)
return Literal(scalar=Scalar(blob=Blob(metadata=meta, uri=remote_path)))

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
try:
uri = lv.scalar.blob.uri
except AttributeError:
TypeTransformerFailedError(f"Cannot convert from {lv} to {expected_python_type}")

local_path = ctx.file_access.get_random_local_path()
ctx.file_access.get_data(uri, local_path, is_multipart=False)

# load sklearn estimator from a file
return joblib.load(local_path)

def guess_python_type(self, literal_type: LiteralType) -> Type[T]:
if (
literal_type.blob is not None
and literal_type.blob.dimensionality == _core_types.BlobType.BlobDimensionality.SINGLE
and literal_type.blob.format == self.SKLEARN_FORMAT
):
return T

raise ValueError(f"Transformer {self} cannot reverse {literal_type}")


class SklearnEstimatorTransformer(SklearnTypeTransformer[sklearn.base.BaseEstimator]):
SKLEARN_FORMAT = "SklearnEstimator"

def __init__(self):
super().__init__(name="Sklearn Estimator", t=sklearn.base.BaseEstimator)


TypeEngine.register(SklearnEstimatorTransformer())
9 changes: 3 additions & 6 deletions plugins/flytekit-dbt/flytekitplugins/dbt/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,8 @@ class DBTFreshnessOutput(BaseDBTOutput):
Attributes
----------
raw_run_result : str
Raw value of DBT's ``run_result.json``.
raw_manifest : str
Raw value of DBT's ``manifest.json``.
raw_sources : str
Raw value of DBT's ``sources.json``.
"""

raw_run_result: str
raw_manifest: str
raw_sources: str
Loading

0 comments on commit 7f3575a

Please sign in to comment.