Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotated StructuredDataset: support nested_types #2252

Merged

Conversation

austin362667
Copy link
Collaborator

@austin362667 austin362667 commented Mar 10, 2024

Tracking issue

flyteorg/flyte#4241

Why are the changes needed?

Currently StructuredDatasets only support flat schemas.
This PR aims to support nested types as form of dict/json, dataclass, named args/kwargs.

What changes were proposed in this pull request?

  1. a flatten_dict() tool function in structured_dataset.py

    • Must use kwtypes() to pass in types. Check comments.
  2. After we get a list of SUPPORTED_TYPES, we select them by series of key joined by ..

    • how to flat? e.g.,
    {
      "a": {
      "b": {
            "c": {
                "d": "vvv"
            }
          }
        },
        "e.f": "www",
    }

    to {'a.b.c.d': 'vvv', 'e.f': 'www'}

    • Can not select by the sub keys. like "c.d".

How was this patch tested?

please take a look at screenshots and examples below.

Setup process

from typing import Annotated
from dataclasses import dataclass
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec

# flytekit_dev_version = "https://github.com/austin362667/flytekit.git@f5cd70dd053e6f3d4aaf5b90d9c4b28f32c0980a"
# image = ImageSpec(
#     packages=[
#         "pandas",
#         "google-cloud-bigquery",
#         "google-cloud-bigquery-storage",
#         f"git+{flytekit_dev_version}",
#         f"git+{flytekit_dev_version}#subdirectory=plugins/flytekit-bigquery",
#     ],
#     apt_packages=["git"],
#     source_root="./keys",
#     env={"GOOGLE_APPLICATION_CREDENTIALS": "./gcp-service-account.json"},
#     platform="linux/arm64",
#     registry="localhost:30000",
# )


## Case 1.
data = [
    {
        'company': 'XYZ pvt ltd',
        'location': 'London',
        'info': {
            'president': 'Rakesh Kapoor',
            'contacts': {
                'email': '[email protected]',
                'tel': '9876543210'
            }
        }
    },
    {
        'company': 'ABC pvt ltd',
        'location': 'USA',
        'info': {
            'president': 'Kapoor Rakesh',
            'contacts': {
                'email': '[email protected]',
                'tel': '0123456789'
            }
        }
    }
]

@dataclass
class ContactsField():
    email: str
    tel: str

@dataclass
class InfoField():
    president: str
    contacts: ContactsField

@dataclass
class CompanyField():
    location: str
    info: InfoField
    company: str


MyArgDataset = Annotated[StructuredDataset, kwtypes(company = str)]
MyDictDataset = Annotated[StructuredDataset, kwtypes(info = {"contacts": {"tel": str}})]
MyDictListDataset = Annotated[StructuredDataset, kwtypes(info = {"contacts": {"tel": str, "email": str}})]
MyTopDataClassDataset = Annotated[StructuredDataset, kwtypes( CompanyField )]
MySecondDataClassDataset = Annotated[StructuredDataset, kwtypes(info = InfoField)]
MyNestedDataClassDataset = Annotated[StructuredDataset, kwtypes(info = kwtypes(contacts = ContactsField))]

@task()
def create_bq_table() -> StructuredDataset:
    df = pd.json_normalize(data, max_level=0)
    print("original dataframe: \n", df)


    # Enable one of GCP uri below if you want!
    return StructuredDataset(
        dataframe=df,
        # uri= "gs://flyte_austin362667_bucket/nested_types"
        # uri= "bq://flyte-austin362667-gcp:dataset.nested_type"
    )

@task()
def print_table_by_arg(sd: MyArgDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyArgDataset dataframe: \n", t)
    return t

@task()
def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDictDataset dataframe: \n", t)
    return t

@task()
def print_table_by_list_dict(sd: MyDictListDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDictListDataset dataframe: \n", t)
    return t

@task()
def print_table_by_top_dataclass(sd: MyTopDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyTopDataClassDataset dataframe: \n", t)
    return t

@task()
def print_table_by_second_dataclass(sd: MySecondDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MySecondDataClassDataset dataframe: \n", t)
    return t

@task()
def print_table_by_nested_dataclass(sd: MyNestedDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyNestedDataClassDataset dataframe: \n", t)
    return t

@workflow
def contacts_wf():
    sd = create_bq_table()
    print_table_by_arg(sd=sd)
    print_table_by_dict(sd=sd)
    print_table_by_list_dict(sd=sd)
    print_table_by_top_dataclass(sd=sd)
    print_table_by_second_dataclass(sd=sd)
    print_table_by_nested_dataclass(sd=sd)
    return 



## Case 2.
@dataclass
class Levels():
    # level1: str
    level2: str

Schema = Annotated[StructuredDataset, kwtypes(age=int, levels=Levels)]

@task()
def mytask_w() -> StructuredDataset:
    df = pd.DataFrame({
        "age": [1, 2],
        "levels": [
            {"level1": "1", "level2": "2"},
            {"level1": "2", "level2": "4"}
        ]
    })
    return StructuredDataset(dataframe=df)

# Should only show level2 string..
@task()
def mytask_r(sd: Schema) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("dataframe: \n", t)
    return t


@workflow
def levels_wf():
    sd = mytask_w()
    mytask_r(sd=sd)

Screenshots

Screenshot 2024-03-27 at 4 31 18 PM Screenshot 2024-03-15 at 6 30 58 PM

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

@dosubot dosubot bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Mar 10, 2024
@austin362667

This comment was marked as outdated.

@austin362667

This comment was marked as outdated.

@austin362667

This comment was marked as resolved.

@austin362667

This comment was marked as outdated.

@austin362667

This comment was marked as outdated.

@austin362667
Copy link
Collaborator Author

@gitgraghu , @dylanwilder
Please help me check with any unexpected behavior. Thank you~
cc @pingsutw

Copy link

codecov bot commented Mar 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 97.40%. Comparing base (bf38b8e) to head (facdeb8).
Report is 43 commits behind head on master.

❗ Current head facdeb8 differs from pull request most recent head a6e469a. Consider uploading reports for the commit a6e469a to get more accurate results

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2252       +/-   ##
===========================================
+ Coverage   83.04%   97.40%   +14.36%     
===========================================
  Files         324        9      -315     
  Lines       24861      231    -24630     
  Branches     3547        0     -3547     
===========================================
- Hits        20645      225    -20420     
+ Misses       3591        6     -3585     
+ Partials      625        0      -625     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pingsutw
Copy link
Member

@austin362667 it doesn't work for me. Try below example,

from typing import Annotated

import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec

data = {
    'company': 'XYZ pvt ltd',
    'location': 'London',
    'info': {
        'president': 'Rakesh Kapoor',
        'contacts': {
            'email': '[email protected]',
            'tel': '9876543210'
        }
    }
}


# MyDataset = Annotated[StructuredDataset, kwtypes(company=str)]
MyDataset = Annotated[StructuredDataset, kwtypes(info={"president": str})]

@task
def create_bq_table() -> StructuredDataset:
    df = pd.json_normalize(data, max_level=0)
    print("dataframe: \n", df)
    return StructuredDataset(dataframe=df)
    # return StructuredDataset(
    #     dataframe=df, uri="bq://dogfood-gcp-dataplane:dataset.nested_type"
    # )


@task
def print_table(sd: MyDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print(t)
    return t


@workflow
def wf():
    sd = create_bq_table()
    print_table(sd=sd)

@austin362667 austin362667 marked this pull request as draft March 14, 2024 03:31
@austin362667 austin362667 force-pushed the austin362667/structured_datasets branch 2 times, most recently from 9c1dc41 to 664c32a Compare March 14, 2024 18:24
@austin362667
Copy link
Collaborator Author

austin362667 commented Mar 14, 2024

@austin362667 it doesn't work for me. Try below example,

@pingsutw Thank you for providing me useful use cases.
These examples can now work in remote mode.
And bigquery or pyarrow is still the tricky part.

@austin362667

This comment was marked as outdated.

@austin362667 austin362667 force-pushed the austin362667/structured_datasets branch 7 times, most recently from bab8b55 to 4d95e19 Compare March 16, 2024 11:44
@austin362667 austin362667 changed the title Annotated StructuredDataset: add nested types support Annotated StructuredDataset: add nested_types support Mar 16, 2024
@austin362667 austin362667 marked this pull request as ready for review March 16, 2024 12:20
@austin362667
Copy link
Collaborator Author

austin362667 commented Mar 16, 2024

@gitgraghu @dylanwilder
Feel free to give it a try if it works in your GCP bq scenario. Thank you!

It worked when I passed it in my own Bucket and BigQuery with my GCP account, both in local and remote modes. However, I'm not 100% sure about other cases.

from typing import Annotated
from dataclasses import dataclass
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec

flytekit_dev_version = "https://github.com/austin362667/flytekit.git@90a19fc51d1b0eb77b020140810883a317432675"
image = ImageSpec(
    packages=[
        "pandas",
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        f"git+{flytekit_dev_version}",
        f"git+{flytekit_dev_version}#subdirectory=plugins/flytekit-bigquery",
    ],
    apt_packages=["git"],
    files=["./keys/gcp-service-account.json"],
    env={"GOOGLE_APPLICATION_CREDENTIALS": "./gcp-service-account.json"},
    platform="linux/arm64",
    registry="localhost:30000",
)

data = [{
    'company': 'XYZ pvt ltd',
    'location': 'London',
    'info': {
        'president': 'Rakesh Kapoor',
        'contacts': {
            'email': '[email protected]',
            'tel': '9876543210'
        }
    }
},
{
    'company': 'ABC pvt ltd',
    'location': 'USA',
    'info': {
        'president': 'Kapoor Rakesh',
        'contacts': {
            'email': '[email protected]',
            'tel': '0123456789'
        }
    }
}
]

@dataclass
class ContactsField():
    # email: str
    tel: str

@dataclass
class InfoField():
    # president: str
    contacts: ContactsField

@dataclass
class CompanyField():
    company: str
    # location: str
    # info: InfoField

# MyArgDataset = Annotated[StructuredDataset, kwtypes(company=str)]
# MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"president": str})]
# MyDataClassDataset = Annotated[StructuredDataset, kwtypes(info=InfoField)]
# MyDataClassDataset = Annotated[StructuredDataset, kwtypes(CompanyField)]
# MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"president": str, "contacts":{"email":str}})]
MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"contacts":{"tel":str}})]
MyDataClassDataset = Annotated[StructuredDataset, kwtypes(info=kwtypes(contacts=ContactsField))]

@task(container_image=image)
def create_bq_table() -> StructuredDataset:
    df = pd.json_normalize(data, max_level=0)
    print("original dataframe: \n", df)
    # return StructuredDataset(dataframe=df)
    return StructuredDataset(
        dataframe=df,
        # uri="gs://flyte_austin362667_bucket/nested_types"
        uri= "bq://flyte-austin362667-gcp:dataset.nested_type"
    )

@task(container_image=image)
def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDictDataset dataframe: \n", t)
    return t

@task(container_image=image)
def print_table_by_dataclass(sd: MyDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDataClassDataset dataframe: \n", t)
    return t

@workflow
def wf():
    sd = create_bq_table()
    print_table_by_dict(sd=sd)
    print_table_by_dataclass(sd=sd)

@austin362667 austin362667 force-pushed the austin362667/structured_datasets branch from 4d95e19 to 90a19fc Compare March 16, 2024 16:55
Signed-off-by: Austin Liu <[email protected]>

wip

Signed-off-by: Austin Liu <[email protected]>

fmt

Signed-off-by: Austin Liu <[email protected]>

fix

Signed-off-by: Austin Liu <[email protected]>

fix

Signed-off-by: Austin Liu <[email protected]>

fix

Signed-off-by: Austin Liu <[email protected]>
@austin362667 austin362667 force-pushed the austin362667/structured_datasets branch from f5cd70d to d2e0821 Compare April 15, 2024 14:14
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. and removed size:M This PR changes 30-99 lines, ignoring generated files. labels Apr 15, 2024
@austin362667 austin362667 force-pushed the austin362667/structured_datasets branch from d2e0821 to 8c8cad4 Compare April 15, 2024 14:18
Signed-off-by: Austin Liu <[email protected]>

fmt

Signed-off-by: Austin Liu <[email protected]>
@austin362667 austin362667 force-pushed the austin362667/structured_datasets branch from 8c8cad4 to bf892f7 Compare April 15, 2024 16:03
my_cols = kwtypes(Name=str, Age=int)
my_dataclass_cols = kwtypes(MyCols)
my_dict_cols = kwtypes({"Name": str, "Age": int})
fields = [("Name", pa.string()), ("Age", pa.int32())]
arrow_schema = pa.schema(fields)
pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add more nested dataframes to cover extreme test cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just add your example to the unit test? we can add a new file (test_structured_dataset_workflow_with_nested_type.py) to tests/flytekit/unit/types/structured_dataset

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

my_cols = kwtypes(Name=str, Age=int)
my_dataclass_cols = kwtypes(MyCols)
my_dict_cols = kwtypes({"Name": str, "Age": int})
fields = [("Name", pa.string()), ("Age", pa.int32())]
arrow_schema = pa.schema(fields)
pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just add your example to the unit test? we can add a new file (test_structured_dataset_workflow_with_nested_type.py) to tests/flytekit/unit/types/structured_dataset

flytekit/types/structured/structured_dataset.py Outdated Show resolved Hide resolved
Signed-off-by: Austin Liu <[email protected]>
Signed-off-by: Austin Liu <[email protected]>
austin362667 and others added 3 commits April 26, 2024 17:14
Signed-off-by: Austin Liu <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})


class MockBQEncodingHandlers(StructuredDatasetEncoder):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this class to ./types/structured_dataset/conftest.py, and add fixture decorator to it?

Copy link
Collaborator Author

@austin362667 austin362667 Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pytest.fixture support for class?

Screenshot 2024-04-30 at 1 57 31 PM

Or I can just remove the useless code.

)


class MockBQDecodingHandlers(StructuredDatasetDecoder):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

@austin362667 austin362667 Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd better just remove the useless duplicated code (MockBQEncodingHandlers , MockBQDecodingHandlers).

pingsutw and others added 2 commits April 30, 2024 10:23
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Austin Liu <[email protected]>
Copy link
Member

@pingsutw pingsutw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@pingsutw pingsutw merged commit 0cc8bbc into flyteorg:master Apr 30, 2024
47 checks passed
austin362667 added a commit to austin362667/flytekit that referenced this pull request May 9, 2024
Signed-off-by: Austin Liu <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
austin362667 added a commit to austin362667/flytekit that referenced this pull request May 10, 2024
Signed-off-by: Austin Liu <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
fiedlerNr9 pushed a commit that referenced this pull request Jul 25, 2024
Signed-off-by: Austin Liu <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Signed-off-by: Jan Fiedler <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L This PR changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants