Skip to content

Commit

Permalink
merged master
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Sep 10, 2024
2 parents b460d78 + ae9c6f8 commit 76db07d
Show file tree
Hide file tree
Showing 21 changed files with 690 additions and 121 deletions.
6 changes: 6 additions & 0 deletions docs/source/design/clis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ Suppose you execute a script that defines 10 tasks and a workflow that calls onl

It is considered fast registration because when a script is executed using ``pyflyte run``, the script is bundled up and uploaded to FlyteAdmin. When the task is executed in the backend, this zipped file is extracted and used.

.. note ::
If `pigz <https://zlib.net/pigz/>`_ is installed, it will be leveraged by ``pyflyte`` to accelerate the compression of the code tarball.
.. _pyflyte-register:

What is ``pyflyte register``?
Expand Down Expand Up @@ -109,6 +113,7 @@ The ``serialize`` command is deprecated around the end of Q3 2024. Users should
Migrate
-------
To use the ``package`` command, make the following changes:

* The ``--local-source-root`` option should be changed to ``--source``
* If the already ``--in-container-virtualenv-root`` option was specified, then move to the ``--python-interpreter`` option in ``package``. The default Python interpreter for serialize was based on this deprecated flag, and if not specified, ``sys.executable``. The default for ``package`` is ``/opt/venv/bin/python3``. If that is not where the Python interpreter is located in the task container, then you'll need to now specify ``--python-interpreter``. Note that this was only used for Spark tasks.
* The ``--in-container-config-path`` option should be removed as this was not actually being used by the ``serialize`` command.
Expand All @@ -117,5 +122,6 @@ To use the ``package`` command, make the following changes:
Functional Changes
------------------
Beyond the options, the ``package`` command differs in that

* Whether or not to use fast register should be specified by the ``--copy auto`` or ``--copy all`` flags, rather than ``fast`` being a subcommand.
* The serialized file output by default is in a .tgz file, rather than being separate files. This means that any subsequent ``flytectl register`` command will need to be updated with the ``--archive`` flag.
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE
from flytekit.configuration import ImageConfig
from flytekit.configuration.plugin import get_plugin
from flytekit.constants import CopyFileDetection
from flytekit.remote.remote import FlyteRemote
from flytekit.tools.fast_registration import CopyFileDetection

FLYTE_REMOTE_INSTANCE_KEY = "flyte_remote"

Expand Down
3 changes: 2 additions & 1 deletion flytekit/clis/sdk_in_container/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
ImageConfig,
SerializationSettings,
)
from flytekit.constants import CopyFileDetection
from flytekit.interaction.click_types import key_value_callback
from flytekit.tools.fast_registration import CopyFileDetection, FastPackageOptions
from flytekit.tools.fast_registration import FastPackageOptions
from flytekit.tools.repo import NoSerializableEntitiesError, serialize_and_package


Expand Down
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec
from flytekit.configuration import ImageConfig
from flytekit.configuration.default_images import DefaultImages
from flytekit.constants import CopyFileDetection
from flytekit.interaction.click_types import key_value_callback
from flytekit.loggers import logger
from flytekit.tools import repo
from flytekit.tools.fast_registration import CopyFileDetection

_register_help = """
This command is similar to ``package`` but instead of producing a zip file, all your Flyte entities are compiled,
Expand Down
3 changes: 2 additions & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SerializationSettings,
)
from flytekit.configuration.plugin import get_plugin
from flytekit.constants import CopyFileDetection
from flytekit.core import context_manager
from flytekit.core.artifact import ArtifactQuery
from flytekit.core.base_task import PythonTask
Expand Down Expand Up @@ -66,7 +67,7 @@
)
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.tools import module_loader
from flytekit.tools.fast_registration import CopyFileDetection, FastPackageOptions
from flytekit.tools.fast_registration import FastPackageOptions
from flytekit.tools.script_mode import _find_project_root, compress_scripts, get_all_modules
from flytekit.tools.translator import Options

Expand Down
14 changes: 14 additions & 0 deletions flytekit/constants/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

from enum import Enum


class CopyFileDetection(Enum):
LOADED_MODULES = 1
ALL = 2
# This option's meaning will change in the future. In the future this will mean that no files should be copied
# (i.e. no fast registration is used). For now, both this value and setting this Enum to Python None are both
# valid to distinguish between users explicitly setting --copy none and not setting the flag.
# Currently, this is only used for register, not for package or run because run doesn't have a no-fast-register
# option and package is by default non-fast.
NO_COPY = 3
10 changes: 5 additions & 5 deletions flytekit/core/container_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from flytekit.core.context_manager import FlyteContext
from flytekit.core.interface import Interface
from flytekit.core.pod_template import PodTemplate
from flytekit.core.python_auto_container import get_registerable_container_image
from flytekit.core.python_auto_container import get_registerable_container_image, update_image_spec_copy_handling
from flytekit.core.resources import Resources, ResourceSpec
from flytekit.core.utils import _get_container_definition, _serialize_pod_spec
from flytekit.image_spec.image_spec import ImageSpec
Expand Down Expand Up @@ -279,10 +279,10 @@ def _get_data_loading_config(self) -> _task_model.DataLoadingConfig:
)

def _get_image(self, settings: SerializationSettings) -> str:
if settings.fast_serialization_settings is None or not settings.fast_serialization_settings.enabled:
if isinstance(self._image, ImageSpec):
# Set the source root for the image spec if it's non-fast registration
self._image.source_root = settings.source_root
"""Update image spec based on fast registration usage, and return string representing the image"""
if isinstance(self._image, ImageSpec):
update_image_spec_copy_handling(self._image, settings)

return get_registerable_container_image(self._image, settings.image_config)

def _get_container(self, settings: SerializationSettings) -> _task_model.Container:
Expand Down
40 changes: 36 additions & 4 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from flyteidl.core import tasks_pb2

from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.constants import CopyFileDetection
from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.pod_template import PodTemplate
Expand Down Expand Up @@ -185,10 +186,10 @@ def get_command(self, settings: SerializationSettings) -> List[str]:
return self._get_command_fn(settings)

def get_image(self, settings: SerializationSettings) -> str:
if settings.fast_serialization_settings is None or not settings.fast_serialization_settings.enabled:
if isinstance(self.container_image, ImageSpec):
# Set the source root for the image spec if it's non-fast registration
self.container_image.source_root = settings.source_root
"""Update image spec based on fast registration usage, and return string representing the image"""
if isinstance(self.container_image, ImageSpec):
update_image_spec_copy_handling(self.container_image, settings)

return get_registerable_container_image(self.container_image, settings.image_config)

def get_container(self, settings: SerializationSettings) -> _task_model.Container:
Expand Down Expand Up @@ -273,6 +274,37 @@ def get_all_tasks(self) -> List[PythonAutoContainerTask]: # type: ignore
default_task_resolver = DefaultTaskResolver()


def update_image_spec_copy_handling(image_spec: ImageSpec, settings: SerializationSettings):
"""
This helper function is where the relationship between fast register and ImageSpec is codified.
If fast register is not enabled, then source root is used and then files are copied.
See the copy option in ImageSpec for more information.
Currently the relationship is incidental. Because serialization settings are not passed into the image spec
build command (and it probably shouldn't be), the builder has no concept of which files to copy, when, and
from where. (or to where but that is hard-coded)
"""
# Handle when the copy method is explicitly set by the user.
if image_spec.source_copy_mode is not None:
if image_spec.source_copy_mode != CopyFileDetection.NO_COPY:
# if we need to copy any files, make sure source root is set. This preserves the behavior pre-copy arg,
# and allows the user to not have to specify source root.
if image_spec.source_root is None and settings.source_root is not None:
image_spec.source_root = settings.source_root

# Handle the default behavior of setting the behavior based on the inverse of fast register usage
# The default behavior additionally requires that serializa
elif settings.fast_serialization_settings is None or not settings.fast_serialization_settings.enabled:
# Set the source root for the image spec if it's non-fast registration
# Unfortunately whether the source_root/copy instructions should be set is implicitly dependent also on the
# existence of the source root in settings.
if settings.source_root is not None or image_spec.source_root is not None:
if image_spec.source_root is None:
image_spec.source_root = settings.source_root
if image_spec.source_copy_mode is None:
image_spec.source_copy_mode = CopyFileDetection.LOADED_MODULES


def get_registerable_container_image(img: Optional[Union[str, ImageSpec]], cfg: ImageConfig) -> str:
"""
Resolve the image to the real image name that should be used for registration.
Expand Down
96 changes: 51 additions & 45 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import mimetypes
import sys
import textwrap
import threading
import typing
from abc import ABC, abstractmethod
from collections import OrderedDict
Expand Down Expand Up @@ -842,6 +843,7 @@ class TypeEngine(typing.Generic[T]):
_DATACLASS_TRANSFORMER: TypeTransformer = DataclassTransformer() # type: ignore
_ENUM_TRANSFORMER: TypeTransformer = EnumTransformer() # type: ignore
has_lazy_import = False
lazy_import_lock = threading.Lock()

@classmethod
def register(
Expand Down Expand Up @@ -995,51 +997,55 @@ def lazy_import_transformers(cls):
"""
Only load the transformers if needed.
"""
if cls.has_lazy_import:
return
cls.has_lazy_import = True
from flytekit.types.structured import (
register_arrow_handlers,
register_bigquery_handlers,
register_pandas_handlers,
register_snowflake_handlers,
)
from flytekit.types.structured.structured_dataset import DuplicateHandlerError

if is_imported("tensorflow"):
from flytekit.extras import tensorflow # noqa: F401
if is_imported("torch"):
from flytekit.extras import pytorch # noqa: F401
if is_imported("sklearn"):
from flytekit.extras import sklearn # noqa: F401
if is_imported("pandas"):
try:
from flytekit.types.schema.types_pandas import PandasSchemaReader, PandasSchemaWriter # noqa: F401
except ValueError:
logger.debug("Transformer for pandas is already registered.")
try:
register_pandas_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for pandas is already registered.")
if is_imported("pyarrow"):
try:
register_arrow_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for arrow is already registered.")
if is_imported("google.cloud.bigquery"):
try:
register_bigquery_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for bigquery is already registered.")
if is_imported("numpy"):
from flytekit.types import numpy # noqa: F401
if is_imported("PIL"):
from flytekit.types.file import image # noqa: F401
if is_imported("snowflake.connector"):
try:
register_snowflake_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for snowflake is already registered.")
with cls.lazy_import_lock:
# Avoid a race condition where concurrent threads may exit lazy_import_transformers before the transformers
# have been imported. This could be implemented without a lock if you assume python assignments are atomic
# and re-registering transformers is acceptable, but I decided to play it safe.
if cls.has_lazy_import:
return
cls.has_lazy_import = True
from flytekit.types.structured import (
register_arrow_handlers,
register_bigquery_handlers,
register_pandas_handlers,
register_snowflake_handlers,
)
from flytekit.types.structured.structured_dataset import DuplicateHandlerError

if is_imported("tensorflow"):
from flytekit.extras import tensorflow # noqa: F401
if is_imported("torch"):
from flytekit.extras import pytorch # noqa: F401
if is_imported("sklearn"):
from flytekit.extras import sklearn # noqa: F401
if is_imported("pandas"):
try:
from flytekit.types.schema.types_pandas import PandasSchemaReader, PandasSchemaWriter # noqa: F401
except ValueError:
logger.debug("Transformer for pandas is already registered.")
try:
register_pandas_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for pandas is already registered.")
if is_imported("pyarrow"):
try:
register_arrow_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for arrow is already registered.")
if is_imported("google.cloud.bigquery"):
try:
register_bigquery_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for bigquery is already registered.")
if is_imported("numpy"):
from flytekit.types import numpy # noqa: F401
if is_imported("PIL"):
from flytekit.types.file import image # noqa: F401
if is_imported("snowflake.connector"):
try:
register_snowflake_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for snowflake is already registered.")

@classmethod
def to_literal_type(cls, python_type: Type) -> LiteralType:
Expand Down
30 changes: 23 additions & 7 deletions flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

import click

from flytekit.constants import CopyFileDetection
from flytekit.image_spec.image_spec import (
_F_IMG_ID,
ImageSpec,
ImageSpecBuilder,
)
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore
from flytekit.tools.script_mode import ls_files

UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template(
"""\
Expand Down Expand Up @@ -165,16 +167,28 @@ def create_docker_context(image_spec: ImageSpec, tmp_dir: Path):

apt_install_command = APT_INSTALL_COMMAND_TEMPLATE.substitute(APT_PACKAGES=" ".join(apt_packages))

if image_spec.source_root:
source_path = tmp_dir / "src"
if image_spec.source_copy_mode is not None and image_spec.source_copy_mode != CopyFileDetection.NO_COPY:
if not image_spec.source_root:
raise ValueError(f"Field source_root for {image_spec} must be set when copy is set")

source_path = tmp_dir / "src"
source_path.mkdir(parents=True, exist_ok=True)
# todo: See note in we should pipe through ignores from the command line here at some point.
# what about deref_symlink?
ignore = IgnoreGroup(image_spec.source_root, [GitIgnore, DockerIgnore, StandardIgnore])
shutil.copytree(
image_spec.source_root,
source_path,
ignore=shutil.ignore_patterns(*ignore.list_ignored()),
dirs_exist_ok=True,

ls, _ = ls_files(
str(image_spec.source_root), image_spec.source_copy_mode, deref_symlinks=False, ignore_group=ignore
)

for file_to_copy in ls:
rel_path = os.path.relpath(file_to_copy, start=str(image_spec.source_root))
Path(source_path / rel_path).parent.mkdir(parents=True, exist_ok=True)
shutil.copy(
file_to_copy,
source_path / rel_path,
)

copy_command_runtime = "COPY --chown=flytekit ./src /root"
else:
copy_command_runtime = ""
Expand Down Expand Up @@ -228,10 +242,12 @@ class DefaultImageBuilder(ImageSpecBuilder):
"""Image builder using Docker and buildkit."""

_SUPPORTED_IMAGE_SPEC_PARAMETERS: ClassVar[set] = {
"id",
"name",
"python_version",
"builder",
"source_root",
"copy",
"env",
"registry",
"packages",
Expand Down
Loading

0 comments on commit 76db07d

Please sign in to comment.