Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] [flytekit] StructuredDataset handling fails when using Azure Blob Storage #2709

Closed
2 tasks done
MorpheusXAUT opened this issue Jul 21, 2022 · 8 comments
Closed
2 tasks done
Labels
bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers
Milestone

Comments

@MorpheusXAUT
Copy link
Contributor

MorpheusXAUT commented Jul 21, 2022

Describe the bug

The StructuredDataset implementation enabled by default in flyteorg/flytekit#885 lacks support for Azure Blob Storage, resulting in an error when trying to handle e.g. pd.DataFrame while using Azure backed storage.

[3/3] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/usr/local/lib/python3.8/dist-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/base_task.py", line 474, in dispatch_execute
        native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/type_engine.py", line 756, in literal_map_to_kwargs
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], v) for k, v in python_types.items()}
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/type_engine.py", line 756, in <dictcomp>
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], v) for k, v in python_types.items()}
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/type_engine.py", line 720, in to_python_value
        return transformer.to_python_value(ctx, lv, expected_pythile "/usr/local/lib/python3.8/dist-packages/flytekit/types/structured/structured_dataset.py", line 589, in to_python_value
        return self.open_as(ctx, sd_literal, expected_python_type, metad)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/types/structured/structured_dataset.py", line 668, in open_as
        decoder = self.get_decoder(df_type, protocol, sd.metadata.structured_dataset_type.format)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/types/structured/structured_dataset.py", line 363, in get_decoder
        return cls._finder(StructuredDatasetTransformerEngine.DECODERS, df_type, protocol, format)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/types/structured/structured_dataset.py", line 355, in _finder
        raise ValueError(f"Failed to find a handler for {df_type}, protocol {protocol}, fmt {format}")

Message:

    Failed to find a handler for <class 'pandas.core.frame.DataFrame'>, protocol abfs, fmt parquet

The workflows in question use flytekit v1.0.5.

Expected behavior

StructuredDatasets/pd.DataFrames are handled correctly on Azure/using the abfs protocol/adlfs (via stow).
Looking at #2684, users should also not have to supply the protocol themselves using ABS, however I assume that's covered by the suggested changes of the other issue as well.

Additional context to reproduce

  1. Set up Flyte instance using Azure Blob Storage as its storage backend
  2. Register workflow handling pd.DataFrames using flytekit v1.0.5
  3. Try to execute workflow

Example regression test we've added to our Flyte test suite, covering our common use cases:

import pandas as pd
from flytekit import task, workflow


_START_RANGE_VALUE = 0
_END_RANGE_VALUE = 100
_DUMMY_DF = pd.DataFrame(
    {
        "quadkeys": list(range(_START_RANGE_VALUE, _END_RANGE_VALUE)),
    }
)


@task() 
def get_pandas_dataframe() -> pd.DataFrame:
    return _DUMMY_DF


@task()
def consume_pandas_dataframe(input_df: pd.DataFrame) -> None:
    for quadkey in input_df["quadkeys"]:
        assert len(quadkey) > -1


@task()
def consume_pandas_dataframe_return_str(input_df: pd.DataFrame) -> str:
    return_val = ""
    for quadkey in input_df["quadkeys"]:
        return_val += str(quadkey)
    return return_val


@task()
def consume_pandas_dataframe_return_pandas_dataframe(input_df: pd.DataFrame) -> pd.DataFrame:
    values = []
    for quadkey in list(reversed(input_df["quadkeys"])):
        values.append(str(quadkey))
    return pd.DataFrame({"quadkeys": values})


@workflow
def test_pandas_dataframe_consumption_and_returning() -> None:
    df1 = get_pandas_dataframe()
    consume_pandas_dataframe(input_df=df1)
    consume_pandas_dataframe_return_str(input_df=df1)
    consume_pandas_dataframe_return_pandas_dataframe(input_df=df1)

All three tasks above will fail with the mentioned error message.

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@MorpheusXAUT MorpheusXAUT added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Jul 21, 2022
@MorpheusXAUT
Copy link
Contributor Author

Unfortunately, quite a few of our workflows are blocked by this/unable to upgrade to a newer version of flytekit (which in turn is required for some other fixes we need), preventing us from running our workloads on Azure at the moment - or requiring us to rewrite them to avoid pd.DataFrame.

As this is a quite time-sensitive issue to us, we're willing to provide a patch ourselves if that'd help.

Skimming through the flytekit StructuredDataset implementation, I believe we should be able to fix this relatively easily by extending the protocols supported to include abfs and register handling in a few places (basically everywhere the S3 protocol is handled).

I assume we might want to wait for flyteorg/flytekit#1107 to be merged/base our changes on top of that so the default protocol handling works for ABS as well.

@pingsutw
Copy link
Member

@MorpheusXAUT I'm trying to fix it ASAP.

@pingsutw
Copy link
Member

Skimming through the flytekit StructuredDataset implementation, I believe we should be able to fix this relatively easily by extending the protocols supported to include abfs and register handling in a few places (basically everywhere the S3 protocol is handled).

Yes, you are right. You can help us add abfs after we merge flyteorg/flytekit#1107

@MorpheusXAUT To workaround the error, could you try to add below code to your workflow script and run again?

from flytekit.types.structured import basic_dfs
from flytekit.types.structured.structured_dataset import StructuredDatasetTransformerEngine
StructuredDatasetTransformerEngine.register(basic_dfs.PandasToParquetEncodingHandler("abfs"), default_for_type=True, override=True)
StructuredDatasetTransformerEngine.register(basic_dfs.ParquetToPandasDecodingHandler("abfs"), default_for_type=True, override=True)

@MorpheusXAUT
Copy link
Contributor Author

MorpheusXAUT commented Jul 21, 2022

@pingsutw Thanks for looking into this so quickly, appreciated!

Yes, you are right. You can help us add abfs after we merge flyteorg/flytekit#1107

Sure thing, will do! I'll open a PR once flyteorg/flytekit#1107 has been merged 🙂
I've cherry-picked the commits from the draft PR onto our internal version for testing/verification and started working on a fix already which we can rebase onto master once the PR's done.

@MorpheusXAUT To workaround the error, could you try to add below code to your workflow script and run again?

👍 Works for our regression test. I'll pass that along as a workaround for our other workflows that are failing, thanks!

@MorpheusXAUT
Copy link
Contributor Author

@pingsutw got a fix ready (based on flyteorg/flytekit#1107) that solves this according to our internal tests. Would you prefer me to create a PR already that's based on flyteorg/flytekit#1107 or wait for the other PR to be merged before submitting it?

@pingsutw
Copy link
Member

@MorpheusXAUT Could you wait for the other PR to be merged before submitting it. will ping you once we merge it.

@wild-endeavor
Copy link
Contributor

merging the other pr soon.

@wild-endeavor
Copy link
Contributor

merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers
Projects
None yet
Development

No branches or pull requests

3 participants