Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Liu <[email protected]>
  • Loading branch information
austin362667 committed Apr 19, 2024
1 parent 774c16e commit 601939c
Showing 1 changed file with 25 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import os
import typing
from dataclasses import dataclass
from tabulate import tabulate

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from tabulate import tabulate
from typing_extensions import Annotated

from flytekit import FlyteContext, FlyteContextManager, kwtypes, task, workflow
from flytekit import FlyteContext, FlyteContextManager, StructuredDataset, kwtypes, task, workflow
from flytekit.models import literals
from flytekit.models.literals import StructuredDatasetMetadata
from flytekit.models.types import StructuredDatasetType
from flytekit.types.structured.basic_dfs import CSVToPandasDecodingHandler, PandasToCSVEncodingHandler
from flytekit.types.structured.structured_dataset import (
CSV,
DF,
PARQUET,
StructuredDataset,
StructuredDatasetDecoder,
StructuredDatasetEncoder,
StructuredDatasetTransformerEngine,
)
from flytekit import ImageSpec, StructuredDataset, kwtypes, task, workflow

pd = pytest.importorskip("pandas")

Expand Down Expand Up @@ -133,7 +131,6 @@ def decode(
StructuredDatasetTransformerEngine.register(CSVToPandasDecodingHandler())



## Case 1.
data = [
{
Expand Down Expand Up @@ -176,41 +173,36 @@ class CompanyField:
MySecondDataClassDataset = Annotated[StructuredDataset, kwtypes(info=InfoField)]
MyNestedDataClassDataset = Annotated[StructuredDataset, kwtypes(info=kwtypes(contacts=ContactsField))]


@task()
def create_pd_table() -> StructuredDataset:
df = pd.json_normalize(data, max_level=0)
print("original dataframe: \n", tabulate(df, headers='keys', tablefmt='psql'))
print("original dataframe: \n", tabulate(df, headers="keys", tablefmt="psql"))

return StructuredDataset(dataframe=df, uri=PANDAS_PATH)

return StructuredDataset(
dataframe=df,
uri=PANDAS_PATH
)

@task()
def create_bq_table() -> StructuredDataset:
df = pd.json_normalize(data, max_level=0)
print("original dataframe: \n", tabulate(df, headers='keys', tablefmt='psql'))
print("original dataframe: \n", tabulate(df, headers="keys", tablefmt="psql"))

# Enable one of GCP `uri` below if you want. You can replace `uri` with your own google cloud endpoints.
return StructuredDataset(
dataframe=df,
uri=BQ_PATH
)
return StructuredDataset(dataframe=df, uri=BQ_PATH)


@task()
def create_np_table() -> StructuredDataset:
df = pd.json_normalize(data, max_level=0)
print("original dataframe: \n", tabulate(df, headers='keys', tablefmt='psql'))
print("original dataframe: \n", tabulate(df, headers="keys", tablefmt="psql"))

return StructuredDataset(dataframe=df, uri=NUMPY_PATH)

return StructuredDataset(
dataframe=df,
uri=NUMPY_PATH
)

@task()
def create_ar_table() -> StructuredDataset:
df = pa.Table.from_pandas(pd.json_normalize(data, max_level=0))
print("original dataframe: \n", tabulate(df, headers='keys', tablefmt='psql'))
print("original dataframe: \n", tabulate(df, headers="keys", tablefmt="psql"))

return StructuredDataset(
dataframe=df,
Expand All @@ -220,49 +212,52 @@ def create_ar_table() -> StructuredDataset:
@task()
def print_table_by_arg(sd: MyArgDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyArgDataset dataframe: \n", tabulate(t, headers='keys', tablefmt='psql'))
print("MyArgDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
return t


@task()
def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyDictDataset dataframe: \n", tabulate(t, headers='keys', tablefmt='psql'))
print("MyDictDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
return t


@task()
def print_table_by_list_dict(sd: MyDictListDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyDictListDataset dataframe: \n", tabulate(t, headers='keys', tablefmt='psql'))
print("MyDictListDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
return t


@task()
def print_table_by_top_dataclass(sd: MyTopDataClassDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyTopDataClassDataset dataframe: \n", tabulate(t, headers='keys', tablefmt='psql'))
print("MyTopDataClassDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
return t


@task()
def print_table_by_top_dict(sd: MyTopDictDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyTopDictDataset dataframe: \n", tabulate(t, headers='keys', tablefmt='psql'))
print("MyTopDictDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
return t


@task()
def print_table_by_second_dataclass(sd: MySecondDataClassDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MySecondDataClassDataset dataframe: \n", tabulate(t, headers='keys', tablefmt='psql'))
print("MySecondDataClassDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
return t


@task()
def print_table_by_nested_dataclass(sd: MyNestedDataClassDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyNestedDataClassDataset dataframe: \n", tabulate(t, headers='keys', tablefmt='psql'))
print("MyNestedDataClassDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
return t


@workflow
def wf():
pd_sd = create_pd_table()
Expand Down Expand Up @@ -299,5 +294,6 @@ def wf():
print_table_by_nested_dataclass(sd=ar_sd)
return


def test_structured_dataset_wf():
wf()

0 comments on commit 601939c

Please sign in to comment.