Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn on StructuredDataset #885

Merged
merged 4 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,9 @@ jobs:
run: |
make setup${{ matrix.spark-version-suffix }}
pip freeze
- name: Test FlyteSchema compatibility
run: |
FLYTE_SDK_USE_STRUCTURED_DATASET=FALSE python -m pytest tests/flytekit_compatibility
- name: Test with coverage
run: |
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE coverage run -m pytest tests/flytekit/unit
coverage run -m pytest tests/flytekit/unit
- name: Codecov
uses: codecov/codecov-action@v1
with:
Expand Down Expand Up @@ -107,7 +104,7 @@ jobs:
- name: Test with coverage
run: |
cd plugins/${{ matrix.plugin-names }}
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE coverage run -m pytest tests
coverage run -m pytest tests

lint:
runs-on: ubuntu-latest
Expand Down
1 change: 0 additions & 1 deletion Dockerfile.py310
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
1 change: 0 additions & 1 deletion Dockerfile.py37
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
1 change: 0 additions & 1 deletion Dockerfile.py38
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
1 change: 0 additions & 1 deletion Dockerfile.py39
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ RUN pip install -U flytekit==$VERSION
WORKDIR /app

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ test: lint unit_test

.PHONY: unit_test
unit_test:
FLYTE_SDK_USE_STRUCTURED_DATASET=FALSE pytest tests/flytekit_compatibility && \
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE pytest tests/flytekit/unit
pytest tests/flytekit/unit

requirements-spark2.txt: export CUSTOM_COMPILE_COMMAND := make requirements-spark2.txt
requirements-spark2.txt: requirements-spark2.in install-piptools
Expand Down Expand Up @@ -79,7 +78,7 @@ requirements: requirements.txt dev-requirements.txt requirements-spark2.txt doc-
# TODO: Change this in the future to be all of flytekit
.PHONY: coverage
coverage:
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE coverage run -m pytest tests/flytekit/unit/core flytekit/types
coverage run -m pytest tests/flytekit/unit/core flytekit/types
coverage report -m --include="flytekit/core/*,flytekit/types/*"

PLACEHOLDER := "__version__\ =\ \"0.0.0+develop\""
Expand Down
14 changes: 6 additions & 8 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,12 @@
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.types import LiteralType
from flytekit.types import directory, file, schema

if _internal.LocalSDK.USE_STRUCTURED_DATASET.read():
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
StructuredDatasetFormat,
StructuredDatasetTransformerEngine,
StructuredDatasetType,
)
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
StructuredDatasetFormat,
StructuredDatasetTransformerEngine,
StructuredDatasetType,
)

__version__ = "0.0.0+develop"

Expand Down
6 changes: 0 additions & 6 deletions flytekit/configuration/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ class LocalSDK(object):
TODO delete the one from internal config
"""

# Feature Gate
USE_STRUCTURED_DATASET = ConfigEntry(LegacyConfigEntry(SECTION, "use_structured_dataset", bool))
"""
Note: This gate will be switched to True at some point in the future. Definitely by 1.0, if not v0.31.0.
"""


class Secrets(object):
SECTION = "secrets"
Expand Down
37 changes: 18 additions & 19 deletions flytekit/types/structured/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
from flytekit.configuration.internal import LocalSDK
from flytekit.loggers import logger

if LocalSDK.USE_STRUCTURED_DATASET.read():
from .basic_dfs import (
ArrowToParquetEncodingHandler,
PandasToParquetEncodingHandler,
ParquetToArrowDecodingHandler,
ParquetToPandasDecodingHandler,
)
from .basic_dfs import (
ArrowToParquetEncodingHandler,
PandasToParquetEncodingHandler,
ParquetToArrowDecodingHandler,
ParquetToPandasDecodingHandler,
)

try:
from .bigquery import (
ArrowToBQEncodingHandlers,
BQToArrowDecodingHandler,
BQToPandasDecodingHandler,
PandasToBQEncodingHandlers,
)
except ImportError:
logger.info(
"We won't register bigquery handler for structured dataset because "
"we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery"
)
try:
from .bigquery import (
ArrowToBQEncodingHandlers,
BQToArrowDecodingHandler,
BQToPandasDecodingHandler,
PandasToBQEncodingHandlers,
)
except ImportError:
logger.info(
"We won't register bigquery handler for structured dataset because "
"we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery"
)
12 changes: 2 additions & 10 deletions flytekit/types/structured/structured_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,6 @@ def register(cls, h: Handlers, default_for_type: Optional[bool] = True, override

The string "://" should not be present in any handler's protocol so we don't check for it.
"""
if not LocalSDK.USE_STRUCTURED_DATASET.read():
logger.info(f"Structured datasets not enabled, not registering handler {h}")
return

lowest_level = cls._handler_finder(h)
if h.supported_format in lowest_level and override is False:
raise ValueError(f"Already registered a handler for {(h.python_type, h.protocol, h.supported_format)}")
Expand Down Expand Up @@ -724,9 +720,5 @@ def guess_python_type(self, literal_type: LiteralType) -> Type[T]:
raise ValueError(f"StructuredDatasetTransformerEngine cannot reverse {literal_type}")


if LocalSDK.USE_STRUCTURED_DATASET.read():
logger.debug("Structured dataset module load... using structured datasets!")
flyte_dataset_transformer = StructuredDatasetTransformerEngine()
TypeEngine.register(flyte_dataset_transformer)
else:
logger.debug("Structured dataset module load... not using structured datasets")
flyte_dataset_transformer = StructuredDatasetTransformerEngine()
TypeEngine.register(flyte_dataset_transformer)
2 changes: 1 addition & 1 deletion plugins/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: test
test:
FLYTE_SDK_USE_STRUCTURED_DATASET=TRUE find . -maxdepth 1 -type d | grep 'flytekit-' | xargs -L1 pytest
find . -maxdepth 1 -type d | grep 'flytekit-' | xargs -L1 pytest

.PHONY: build_all_plugins
build_all_plugins:
Expand Down
26 changes: 13 additions & 13 deletions plugins/flytekit-data-fsspec/flytekitplugins/fsspec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
from flytekit.configuration import internal
from flytekit.types.structured.structured_dataset import GCS, S3

from .arrow import ArrowToParquetEncodingHandler, ParquetToArrowDecodingHandler
from .pandas import PandasToParquetEncodingHandler, ParquetToPandasDecodingHandler
from .persist import FSSpecPersistence

if internal.LocalSDK.USE_STRUCTURED_DATASET.read():
from .arrow import ArrowToParquetEncodingHandler, ParquetToArrowDecodingHandler
from .pandas import PandasToParquetEncodingHandler, ParquetToPandasDecodingHandler

def _register(protocol: str):
logger.info(f"Registering fsspec {protocol} implementations and overriding default structured encoder/decoder.")
StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(protocol), True, True)
StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(protocol), True, True)
StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(protocol), True, True)
StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(protocol), True, True)
def _register(protocol: str):
logger.info(f"Registering fsspec {protocol} implementations and overriding default structured encoder/decoder.")
StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(protocol), True, True)
StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(protocol), True, True)
StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(protocol), True, True)
StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(protocol), True, True)

if importlib.util.find_spec("s3fs"):
_register(S3)

if importlib.util.find_spec("gcsfs"):
_register(GCS)
if importlib.util.find_spec("s3fs"):
_register(S3)

if importlib.util.find_spec("gcsfs"):
_register(GCS)
4 changes: 1 addition & 3 deletions plugins/flytekit-spark/flytekitplugins/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from flytekit.configuration import internal as _internal

from .schema import SparkDataFrameSchemaReader, SparkDataFrameSchemaWriter, SparkDataFrameTransformer # noqa
from .sd_transformers import ParquetToSparkDecodingHandler, SparkToParquetEncodingHandler
from .task import Spark, new_spark_session # noqa

if _internal.LocalSDK.USE_STRUCTURED_DATASET.read():
from .sd_transformers import ParquetToSparkDecodingHandler, SparkToParquetEncodingHandler
1 change: 0 additions & 1 deletion plugins/flytekit-sqlalchemy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /app
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE

RUN pip install awscli
RUN pip install gsutil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
ENV FLYTE_SDK_USE_STRUCTURED_DATASET TRUE

# This is necessary for opencv to work
RUN apt-get update && apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg build-essential
Expand Down
Empty file.
53 changes: 0 additions & 53 deletions tests/flytekit_compatibility/test_schema_types.py

This file was deleted.

Loading