Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core feature] Allow nested fields in structured datasets #4241

Closed
2 tasks done
Tracked by #4064
dylanwilder opened this issue Oct 16, 2023 · 8 comments
Closed
2 tasks done
Tracked by #4064

[Core feature] Allow nested fields in structured datasets #4241

dylanwilder opened this issue Oct 16, 2023 · 8 comments
Assignees
Labels
enhancement New feature or request flytekit FlyteKit Python related issue hacktoberfest

Comments

@dylanwilder
Copy link

Motivation: Why do you think this is important?

Most storage formats have support for nested field structures today (Avro, parquet, bq) but currently StructuredDatasets only support flat schemas. This prevents the usage of common data modeling and organizational practices, or requires bypassing structured datasets to simple pass a URI which prevents type checking.

Goal: What should the final outcome look like, ideally?

one or both of the following should be possible

@dataclass_json
@dataclass
class RecordField
    name: str
    age: int

Schema1 = Annotated[StructuredDataset, kwtypes(data=RecordField, ...)
Schema2 = Annotated[StructuredDataset, kwtypes(data={"name": str, "age": int}, ...)

Describe alternatives you've considered

As above, can return a URI, but this is essentially bypassing the type system entirely.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@dylanwilder dylanwilder added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Oct 16, 2023
@pingsutw pingsutw added hacktoberfest flytekit FlyteKit Python related issue and removed untriaged This issues has not yet been looked at by the Maintainers labels Oct 16, 2023
@SophieTech88
Copy link
Contributor

Can I try this issue?

@austin362667
Copy link
Contributor

#take

@austin362667
Copy link
Contributor

Hi! @dylanwilder cc @pingsutw

  • Is this issue dealing with Column type information or Serialized byte format
  • or even Storage driver and location, or Additional third party schema information?

I think the sample code provided above can already be resolved in Python.

  1. pyflyte register ./dataclass.py or pyflyte run ./dataclass.py dataclass_wf should pass.
import os
import tempfile
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
# If you’re using Flytekit version below v1.10, you’ll need to decorate with @dataclass_json using from dataclass_json import dataclass_json instead of inheriting from Mashumaro’s DataClassJSONMixin.
from mashumaro.mixins.json import DataClassJSONMixin
# Add Annotated: https://docs.python.org/3/library/typing.html#typing.Annotated
from typing_extensions import Annotated

@dataclass
class DetailField():
    age: int
    sex: str
    
@dataclass
class RecordField():
    name: str
    detail: DetailField

# @dataclass_json
@dataclass
class FlyteTypes(DataClassJSONMixin):
    dataframe: Annotated[StructuredDataset, {"name": str, "detail": {"age": int, "sex": str}}]
    file: FlyteFile
    directory: FlyteDirectory


@task
def upload_data() -> FlyteTypes:
    """
    Flytekit will upload FlyteFile, FlyteDirectory and StructuredDataset to the blob store,
    such as GCP or S3.
    """
    # 1. StructuredDataset
    df = pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]})

    # 2. FlyteDirectory
    temp_dir = tempfile.mkdtemp(prefix="flyte-")
    df.to_parquet(temp_dir + "/df.parquet")

    # 3. FlyteFile
    file_path = tempfile.NamedTemporaryFile(delete=False)
    file_path.write(b"Hello, World!")

    fs = FlyteTypes(
        dataframe=StructuredDataset(dataframe=df),
        file=FlyteFile(file_path.name),
        directory=FlyteDirectory(temp_dir),
    )
    return fs

@task
def download_data(res: FlyteTypes):
    assert pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]}).equals(res.dataframe.open(pd.DataFrame).all())
    f = open(res.file, "r")
    assert f.read() == "Hello, World!"
    assert os.listdir(res.directory) == ["df.parquet"]

@workflow
def dataclass_wf() -> (FlyteTypes):
    o1 = upload_data()
    download_data(res=o1)
    return o1

if __name__ == "__main__":
    dataclass_wf()
  1. pyflyte register ./dataclass.py or pyflyte run ./dataclass.py dataclass_wf should show typing error.
    • Modify the age field of nested Tom's detail field to "20"(str) from 20(int).
 import os
 import tempfile
 from dataclasses import dataclass
 
 import pandas as pd
 from flytekit import task, workflow
 from flytekit.types.directory import FlyteDirectory
 from flytekit.types.file import FlyteFile
 from flytekit.types.structured import StructuredDataset
 # If you’re using Flytekit version below v1.10, you’ll need to decorate with @dataclass_json using from dataclass_json import dataclass_json instead of inheriting from Mashumaro’s DataClassJSONMixin.
 from mashumaro.mixins.json import DataClassJSONMixin
 # Add Annotated: https://docs.python.org/3/library/typing.html#typing.Annotated
 from typing_extensions import Annotated
 
 @dataclass
 class DetailField():
     age: int
     sex: str
     
 @dataclass
 class RecordField():
     name: str
     detail: DetailField
 
 # @dataclass_json
 @dataclass
 class FlyteTypes(DataClassJSONMixin):
     dataframe: Annotated[StructuredDataset, {"name": str, "detail": {"age": int, "sex": str}}]
     file: FlyteFile
     directory: FlyteDirectory
 
 
 @task
 def upload_data() -> FlyteTypes:
     """
     Flytekit will upload FlyteFile, FlyteDirectory and StructuredDataset to the blob store,
     such as GCP or S3.
     """
     # 1. StructuredDataset
     df = pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":"20", "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]})
 
     # 2. FlyteDirectory
     temp_dir = tempfile.mkdtemp(prefix="flyte-")
     df.to_parquet(temp_dir + "/df.parquet")
 
     # 3. FlyteFile
     file_path = tempfile.NamedTemporaryFile(delete=False)
     file_path.write(b"Hello, World!")
 
     fs = FlyteTypes(
         dataframe=StructuredDataset(dataframe=df),
         file=FlyteFile(file_path.name),
         directory=FlyteDirectory(temp_dir),
     )
     return fs
 
 @task
 def download_data(res: FlyteTypes):
     assert pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]}).equals(res.dataframe.open(pd.DataFrame).all())
     f = open(res.file, "r")
     assert f.read() == "Hello, World!"
     assert os.listdir(res.directory) == ["df.parquet"]
 
 @workflow
 def dataclass_wf() -> (FlyteTypes):
     o1 = upload_data()
     download_data(res=o1)
     return o1
 
 if __name__ == "__main__":
     dataclass_wf()
  1. Following should work as well:
  # Schema = Annotated[StructuredDataset, RecordField]
  Schema = Annotated[StructuredDataset, {"name": str, "detail": {"age": int, "sex": str}}]

  @task
  def mytask() -> Schema:
     return StructuredDataset(dataframe=pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]}))

@dylanwilder
Copy link
Author

@austin362667 tested your code and it works for me. but it looks to not compatible with kwtypes. I think that's fine, except that kwtypes was the historically recommended way to specify schema (not sure if still true). Perhaps, this is just a documentation update?

@gitgraghu
Copy link

gitgraghu commented Mar 7, 2024

@austin362667 we noticed that the schema stored in the flyte workflow definition is empty as shown below. Could you confirm that you are able to see the schema in the workflow definition as well. Also, we noticed this works locally, but not on flyte.

  "type": {
    "structuredDatasetType": {
      "format": "parquet",
      "columns": [],
      "externalSchemaType": "",
      "externalSchemaBytes": ""
    }

@austin362667
Copy link
Contributor

austin362667 commented Mar 9, 2024

Hi @gitgraghu , @dylanwilder

  • I think it can run on a Flyte cluster. Please check the attached screenshots and let me know if you find any unexpected behavior.
  • Regarding the workflow definition, I'm not sure what you mean by workflow type stuff. Could you please clarify if you are asking about whether Annotated StructuredDatasets are being properly set within the workflow? Could you please show me how to reproduce the bugs?
> pyflyte run  --remote --image localhost:30000/basic:latest ./dataclass.py dataclass_wf
> [✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f04e025125ee14624bd1 to see execution in the console.

where image contains pandas package and located in localhost docker registry
Running Execution on Remote.

Screenshot 2024-03-10 at 3 32 53 AM Screenshot 2024-03-10 at 3 33 12 AM Screenshot 2024-03-10 at 3 33 24 AM Screenshot 2024-03-10 at 3 33 34 AM
import os
import tempfile
from dataclasses import dataclass
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
# If you’re using Flytekit version below v1.10, you’ll need to decorate with @dataclass_json using from dataclass_json import dataclass_json instead of inheriting from Mashumaro’s DataClassJSONMixin.
from mashumaro.mixins.json import DataClassJSONMixin
# Add Annotated: https://docs.python.org/3/library/typing.html#typing.Annotated
from typing_extensions import Annotated
from flytekit import ImageSpec, Resources, task

import pandas as pd

# custom_image = ImageSpec(
#     name="basic:latest",
#     registry="localhost:30000",
#     packages=["pandas", "numpy"], 
#     apt_packages=['git'],
#     python_version=3.11,
# )

# if custom_image.is_container():
    # import pandas as pd

@dataclass
class DetailField():
    age: int
    sex: str

@dataclass
class RecordField():
    name: str
    detail: DetailField

# Schema = Annotated[StructuredDataset, RecordField]
Schema = Annotated[StructuredDataset, {"name": str, "detail": {"age": int, "sex": str}}]

# @dataclass_json
@dataclass
class FlyteTypes(DataClassJSONMixin):
    dataframe: Schema
    file: FlyteFile
    directory: FlyteDirectory


@task(requests=Resources(cpu="1", mem="1Gi"))
def upload_data() -> FlyteTypes:
    """
    Flytekit will upload FlyteFile, FlyteDirectory and StructuredDataset to the blob store,
    such as GCP or S3.
    """
    # 1. StructuredDataset
    df = pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]})

    # 2. FlyteDirectory
    temp_dir = tempfile.mkdtemp(prefix="flyte-")
    df.to_parquet(temp_dir + "/df.parquet")

    # 3. FlyteFile
    file_path = tempfile.NamedTemporaryFile(delete=False)
    file_path.write(b"Hello, World!")

    fs = FlyteTypes(
        dataframe=StructuredDataset(dataframe=df),
        file=FlyteFile(file_path.name),
        directory=FlyteDirectory(temp_dir),
    )
    print("upload_data:\n", fs.dataframe.dataframe)
    return fs

@task(requests=Resources(cpu="1", mem="1Gi"))
def download_data(res: FlyteTypes):
    assert pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]}).equals(res.dataframe.open(pd.DataFrame).all())
    f = open(res.file, "r")
    assert f.read() == "Hello, World!"
    assert os.listdir(res.directory) == ["df.parquet"]
    print("download_data:\n", res.dataframe.open(pd.DataFrame).all())
    
@task(requests=Resources(cpu="1", mem="1Gi"))
def mytask() -> Schema:
    df = pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}]})
    print('my_df:\n', df)
    return StructuredDataset(dataframe=df)
    #TODO: returning pd.DataFrame can't work cc [Kevin Su](https://github.com/pingsutw)
    # return df 

@workflow
def dataclass_wf()-> (FlyteTypes):
    mytask()
    o1 = upload_data()
    download_data(res=o1)
    return o1

if __name__ == "__main__":
    dataclass_wf()

@gitgraghu
Copy link

gitgraghu commented Mar 9, 2024

@austin362667 I feel below two ways of specifying the schema is not working as expected and is similar to not specifying a schema.

Schema = Annotated[StructuredDataset, {"Name": str, "Detail": {"Age": int, "Sex": str}}]
Schema = Annotated[StructuredDataset, RecordField]

Try the following example with different variations of Schema (json, dataclass and kwtypes). I have added a City column to the dataframe.

@task
def write_df() -> Schema:
    df = pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}], "City": ["Madrid","Paris","London"]})
    return StructuredDataset(dataframe=df)

@task
def read_df(structds: Schema):
    df = structds.open(pd.DataFrame).all()
    print(df)

@workflow
def struct_record_wf():
    out = write_df()
    read_df(structds=out)

Even if I specify only Name and Detail field in the schema. The open call returns all the data which is equivalent to not specifying any schema. However, using kwtypes will consider the schema and read only the columns specified.

Schema = Annotated[StructuredDataset, kwtypes(Name=str, City=str)]

@austin362667
Copy link
Contributor

@gitgraghu You're right! Thanks for reporting, working on it now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request flytekit FlyteKit Python related issue hacktoberfest
Projects
None yet
Development

No branches or pull requests

5 participants