Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytectl] Use Protobuf Struct as dataclass Input for backward compatibility #5840

Merged
merged 1 commit into from
Oct 15, 2024

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Oct 11, 2024

Tracking issue

#5318

Why are the changes needed?

If an old flytekit image receives BINARY MSGPACK Inputs, it will fail, since it can't interpret it.

Note: for the upcoming flytekit, we can use protobuf struct as input and still make dataclass/pydantic basemodel works well.

What changes were proposed in this pull request?

Generate JSON String instead of MSGPACK BYTES when using yaml dictionary as input to create dataclass.

How was this patch tested?

execute a pydantic workflow by using flytectl (yaml dictionary as input).

from pydantic import BaseModel, Field
from typing import Dict, List
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from flytekit import task, workflow, ImageSpec
from enum import Enum
import os


flytekit_hash = "35312016dfcd305b6969933895f719e6f202e744"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
pydantic_plugin = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}#subdirectory=plugins/flytekit-pydantic-v2"

# Define custom image for the task
image = ImageSpec(packages=[flytekit, pydantic_plugin],
                            apt_packages=["git"],
                            registry="localhost:30000",
                         )

class Status(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

class InnerDC(BaseModel):
    a: int = -1
    b: float = 2.1
    c: str = "Hello, Flyte"
    d: bool = False
    e: List[int] = Field(default_factory=lambda: [0, 1, 2, -1, -2])
    f: List[FlyteFile] = Field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/example.txt")])
    g: List[List[int]] = Field(default_factory=lambda: [[0], [1], [-1]])
    h: List[Dict[int, bool]] = Field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
    i: Dict[int, bool] = Field(default_factory=lambda: {0: False, 1: True, -1: False})
    j: Dict[int, FlyteFile] = Field(default_factory=lambda: {0: FlyteFile("s3://my-s3-bucket/example.txt"),
                                                             1: FlyteFile("s3://my-s3-bucket/example.txt"),
                                                             -1: FlyteFile("s3://my-s3-bucket/example.txt")})
    k: Dict[int, List[int]] = Field(default_factory=lambda: {0: [0, 1, -1]})
    l: Dict[int, Dict[int, int]] = Field(default_factory=lambda: {1: {-1: 0}})
    m: dict = Field(default_factory=lambda: {"key": "value"})
    n: FlyteFile = Field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/example.txt"))
    o: FlyteDirectory = Field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
    enum_status: Status = Status.PENDING


class DC(BaseModel):
    a: int = -1
    b: float = 2.1
    c: str = "Hello, Flyte"
    d: bool = False
    e: List[int] = Field(default_factory=lambda: [0, 1, 2, -1, -2])
    f: List[FlyteFile] = Field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/example.txt")])
    g: List[List[int]] = Field(default_factory=lambda: [[0], [1], [-1]])
    h: List[Dict[int, bool]] = Field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
    i: Dict[int, bool] = Field(default_factory=lambda: {0: False, 1: True, -1: False})
    j: Dict[int, FlyteFile] = Field(default_factory=lambda: {0: FlyteFile("s3://my-s3-bucket/example.txt"),
                                                             1: FlyteFile("s3://my-s3-bucket/example.txt"),
                                                             -1: FlyteFile("s3://my-s3-bucket/example.txt")})
    k: Dict[int, List[int]] = Field(default_factory=lambda: {0: [0, 1, -1]})
    l: Dict[int, Dict[int, int]] = Field(default_factory=lambda: {1: {-1: 0}})
    m: dict = Field(default_factory=lambda: {"key": "value"})
    n: FlyteFile = Field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/example.txt"))
    o: FlyteDirectory = Field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
    inner_dc: InnerDC = Field(default_factory=lambda: InnerDC())
    enum_status: Status = Status.PENDING


@task(container_image=image)
def t_dc(dc: DC) -> DC:
    return dc
@task(container_image=image)
def t_inner(inner_dc: InnerDC):
    assert isinstance(inner_dc, InnerDC)

    expected_file_content = "Default content"

    # f: List[FlyteFile]
    for ff in inner_dc.f:
        assert isinstance(ff, FlyteFile)
        with open(ff, "r") as f:
            assert f.read() == expected_file_content
    # j: Dict[int, FlyteFile]
    for _, ff in inner_dc.j.items():
        assert isinstance(ff, FlyteFile)
        with open(ff, "r") as f:
            assert f.read() == expected_file_content
    # n: FlyteFile
    assert isinstance(inner_dc.n, FlyteFile)
    with open(inner_dc.n, "r") as f:
        assert f.read() == expected_file_content
    # o: FlyteDirectory
    assert isinstance(inner_dc.o, FlyteDirectory)
    assert not inner_dc.o.downloaded
    with open(os.path.join(inner_dc.o, "example.txt"), "r") as fh:
        assert fh.read() == expected_file_content
    assert inner_dc.o.downloaded
    print("Test InnerDC Successfully Passed")
    # enum: Status
    assert inner_dc.enum_status == Status.PENDING


@task(container_image=image)
def t_test_all_attributes(a: int, b: float, c: str, d: bool, e: List[int], f: List[FlyteFile], g: List[List[int]],
                          h: List[Dict[int, bool]], i: Dict[int, bool], j: Dict[int, FlyteFile],
                          k: Dict[int, List[int]], l: Dict[int, Dict[int, int]], m: dict,
                          n: FlyteFile, o: FlyteDirectory,
                          enum_status: Status
                          ):
    # Strict type checks for simple types
    assert isinstance(a, int), f"a is not int, it's {type(a)}"
    assert a == -1
    assert isinstance(b, float), f"b is not float, it's {type(b)}"
    assert isinstance(c, str), f"c is not str, it's {type(c)}"
    assert isinstance(d, bool), f"d is not bool, it's {type(d)}"

    # Strict type checks for List[int]
    assert isinstance(e, list) and all(isinstance(i, int) for i in e), "e is not List[int]"

    # Strict type checks for List[FlyteFile]
    assert isinstance(f, list) and all(isinstance(i, FlyteFile) for i in f), "f is not List[FlyteFile]"

    # Strict type checks for List[List[int]]
    assert isinstance(g, list) and all(
        isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g), "g is not List[List[int]]"

    # Strict type checks for List[Dict[int, bool]]
    assert isinstance(h, list) and all(
        isinstance(i, dict) and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) for i in h
    ), "h is not List[Dict[int, bool]]"

    # Strict type checks for Dict[int, bool]
    assert isinstance(i, dict) and all(
        isinstance(k, int) and isinstance(v, bool) for k, v in i.items()), "i is not Dict[int, bool]"

    # Strict type checks for Dict[int, FlyteFile]
    assert isinstance(j, dict) and all(
        isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items()), "j is not Dict[int, FlyteFile]"

    # Strict type checks for Dict[int, List[int]]
    assert isinstance(k, dict) and all(
        isinstance(k, int) and isinstance(v, list) and all(isinstance(i, int) for i in v) for k, v in
        k.items()), "k is not Dict[int, List[int]]"

    # Strict type checks for Dict[int, Dict[int, int]]
    assert isinstance(l, dict) and all(
        isinstance(k, int) and isinstance(v, dict) and all(
            isinstance(sub_k, int) and isinstance(sub_v, int) for sub_k, sub_v in v.items())
        for k, v in l.items()), "l is not Dict[int, Dict[int, int]]"

    # Strict type check for a generic dict
    assert isinstance(m, dict), "m is not dict"

    # Strict type check for FlyteFile
    assert isinstance(n, FlyteFile), "n is not FlyteFile"

    # Strict type check for FlyteDirectory
    assert isinstance(o, FlyteDirectory), "o is not FlyteDirectory"

    # Strict type check for Enum
    assert isinstance(enum_status, Status), "enum_status is not Status"

    print("All attributes passed strict type checks.")


@workflow
def wf(dc: DC):
    t_dc(dc=dc)
    t_inner(inner_dc=dc.inner_dc)
    t_test_all_attributes(a=dc.a, b=dc.b, c=dc.c,
                          d=dc.d, e=dc.e, f=dc.f,
                          g=dc.g, h=dc.h, i=dc.i,
                          j=dc.j, k=dc.k, l=dc.l,
                          m=dc.m, n=dc.n, o=dc.o,
                          enum_status=dc.enum_status
                          )

    t_test_all_attributes(a=dc.inner_dc.a, b=dc.inner_dc.b, c=dc.inner_dc.c,
                          d=dc.inner_dc.d, e=dc.inner_dc.e, f=dc.inner_dc.f,
                          g=dc.inner_dc.g, h=dc.inner_dc.h, i=dc.inner_dc.i,
                          j=dc.inner_dc.j, k=dc.inner_dc.k, l=dc.inner_dc.l,
                          m=dc.inner_dc.m, n=dc.inner_dc.n, o=dc.inner_dc.o,
                          enum_status=dc.inner_dc.enum_status
                          )

if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = os.path.realpath(__file__)
    input_val = '{"a": -1, "b": 3.14}'

    result = runner.invoke(pyflyte.main,
                           ["run", path, "wf", "--dc", input_val])
    print("Local Execution: ", result.output)

    result = runner.invoke(pyflyte.main,
                           ["run", "--remote", path, "wf", "--dc", input_val])
    print("Remote Execution: ", result.output)
iamRoleARN: ""
inputs:
    dc:
        a: 1
        b: 3.14
        c: example_string
envs: {}
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: wf_all.t_dc
version: kN1CQsBQEjJVuXel2yALXw
flytectl create execution --execFile build/PR/JSON/stacked_PRs/flytectl/create_task.yaml -p flytesnacks -d development

Setup process

Screenshots

image image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

#5763

Copy link

codecov bot commented Oct 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 36.34%. Comparing base (d062824) to head (5ca30d6).
Report is 7 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5840      +/-   ##
==========================================
- Coverage   36.35%   36.34%   -0.01%     
==========================================
  Files        1304     1304              
  Lines      110148   110137      -11     
==========================================
- Hits        40042    40034       -8     
+ Misses      65939    65936       -3     
  Partials     4167     4167              
Flag Coverage Δ
unittests-datacatalog 51.37% <ø> (ø)
unittests-flyteadmin 55.60% <ø> (ø)
unittests-flytecopilot 12.17% <ø> (ø)
unittests-flytectl 62.26% <ø> (+0.04%) ⬆️
unittests-flyteidl 7.14% <100.00%> (-0.03%) ⬇️
unittests-flyteplugins 53.35% <ø> (ø)
unittests-flytepropeller 42.02% <ø> (ø)
unittests-flytestdlib 55.35% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Future-Outlier Future-Outlier merged commit db23c3f into master Oct 15, 2024
50 checks passed
@Future-Outlier Future-Outlier deleted the revert-flytectl-msgpack-idl-change branch October 15, 2024 06:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants