diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 88001991f7..fabb2e2983 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -86,6 +86,7 @@ jobs: - flytekit-snowflake - flytekit-spark - flytekit-sqlalchemy + - flytekit-whylogs exclude: # flytekit-modin depends on ray which does not have a 3.10 wheel yet. # Issue tracked in https://github.com/ray-project/ray/issues/19116. @@ -103,6 +104,12 @@ jobs: plugin-names: "flytekit-onnx-scikitlearn" - python-version: 3.10 plugin-names: "flytekit-onnx-tensorflow" + # whylogs-sketching library does not have a 3.10 build yet + # Issue tracked: https://github.com/whylabs/whylogs/issues/697 + - python-version: 3.10 + plugin-names: "flytekit-whylogs" + + steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/plugins/flytekit-whylogs/README.md b/plugins/flytekit-whylogs/README.md new file mode 100644 index 0000000000..aeaff969e5 --- /dev/null +++ b/plugins/flytekit-whylogs/README.md @@ -0,0 +1,57 @@ +# Flytekit whylogs Plugin + +whylogs is an open source library for logging any kind of data. With whylogs, +you are able to generate summaries of datasets (called whylogs profiles) which +can be used to: + +- Create data constraints to know whether your data looks the way it should +- Quickly visualize key summary statistics about a dataset +- Track changes in a dataset over time + +```bash +pip install flytekitplugins-whylogs +``` + +To generate profiles, you can add a task like the following: + +```python +from whylogs.core import DatasetProfileView +import whylogs as ylog + +import pandas as pd + +@task +def profile(df: pd.DataFrame) -> DatasetProfileView: + result = ylog.log(df) # Various overloads for different common data types exist + profile = result.view() + return profile +``` + +>**NOTE:** You'll be passing around `DatasetProfileView` from tasks, not `DatasetProfile`. + +## Validating Data + +A common step in data pipelines is data validation. This can be done in +`whylogs` through the constraint feature. You'll be able to create failure tasks +if the data in the workflow doesn't conform to some configured constraints, like +min/max values on features, data types on features, etc. + +```python +@task +def validate_data(profile: DatasetProfileView): + column = profile.get_column("my_column") + print(column.to_summary_dict()) # To see available things you can validate against + builder = ConstraintsBuilder(profile) + numConstraint = MetricConstraint( + name='numbers between 0 and 4 only', + condition=lambda x: x.min > 0 and x.max < 4, + metric_selector=MetricsSelector(metric_name='distribution', column_name='my_column')) + builder.add_constraint(numConstraint) + constraint = builder.build() + valid = constraint.validate() + + if(not valid): + raise Exception("Invalid data found") +``` + +Check out our [constraints notebook](https://github.com/whylabs/whylogs/blob/1.0.x/python/examples/basic/MetricConstraints.ipynb) for more examples. diff --git a/plugins/flytekit-whylogs/flytekitplugins/whylogs/__init__.py b/plugins/flytekit-whylogs/flytekitplugins/whylogs/__init__.py new file mode 100644 index 0000000000..ca368cba3d --- /dev/null +++ b/plugins/flytekit-whylogs/flytekitplugins/whylogs/__init__.py @@ -0,0 +1,4 @@ +from .renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer +from .schema import WhylogsDatasetProfileTransformer + +__all__ = ["WhylogsDatasetProfileTransformer", "WhylogsConstraintsRenderer", "WhylogsSummaryDriftRenderer"] diff --git a/plugins/flytekit-whylogs/flytekitplugins/whylogs/renderer.py b/plugins/flytekit-whylogs/flytekitplugins/whylogs/renderer.py new file mode 100644 index 0000000000..70d5ccbbae --- /dev/null +++ b/plugins/flytekit-whylogs/flytekitplugins/whylogs/renderer.py @@ -0,0 +1,65 @@ +import whylogs as why +from pandas import DataFrame +from whylogs.core.constraints import Constraints +from whylogs.viz import NotebookProfileVisualizer + + +class WhylogsSummaryDriftRenderer: + """ + Creates a whylogs' Summary Drift report from two pandas DataFrames. One of them + is the reference and the other one is the target data, meaning that this is what + the report will compare it against. + """ + + @staticmethod + def to_html(reference_data: DataFrame, target_data: DataFrame) -> str: + """ + This static method will profile the input data and then generate an HTML report + with the Summary Drift calculations for all the dataframe's columns + + :param reference_data: The DataFrame that will be the reference for the drift report + :type: pandas.DataFrame + + :param target_data: The data to compare against and create the Summary Drift report + :type target_data: pandas.DataFrame + """ + + target_view = why.log(target_data).view() + reference_view = why.log(reference_data).view() + viz = NotebookProfileVisualizer() + viz.set_profiles(target_profile_view=target_view, reference_profile_view=reference_view) + return viz.summary_drift_report().data + + +class WhylogsConstraintsRenderer: + """ + Creates a whylogs' Constraints report from a `Constraints` object. Currently our API + requires the user to have a profiled DataFrame in place to be able to use it. Then the report + will render a nice HTML that will let users check which constraints passed or failed their + logic. An example constraints object definition can be written as follows: + + .. code-block:: python + + profile_view = why.log(df).view() + builder = ConstraintsBuilder(profile_view) + num_constraint = MetricConstraint( + name=f'numbers between {min_value} and {max_value} only', + condition=lambda x: x.min > min_value and x.max < max_value, + metric_selector=MetricsSelector( + metric_name='distribution', + column_name='sepal_length' + ) + ) + + builder.add_constraint(num_constraint) + constraints = builder.build() + + Each Constraints object (builder.build() in the former example) can have as many constraints as + desired. If you want to learn more, check out our docs and examples at https://whylogs.readthedocs.io/ + """ + + @staticmethod + def to_html(constraints: Constraints) -> str: + viz = NotebookProfileVisualizer() + report = viz.constraints_report(constraints=constraints) + return report.data diff --git a/plugins/flytekit-whylogs/flytekitplugins/whylogs/schema.py b/plugins/flytekit-whylogs/flytekitplugins/whylogs/schema.py new file mode 100644 index 0000000000..71247255f7 --- /dev/null +++ b/plugins/flytekit-whylogs/flytekitplugins/whylogs/schema.py @@ -0,0 +1,50 @@ +from typing import Type + +from whylogs.core import DatasetProfileView + +from flytekit import BlobType, FlyteContext +from flytekit.extend import T, TypeEngine, TypeTransformer +from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar +from flytekit.models.types import LiteralType + + +class WhylogsDatasetProfileTransformer(TypeTransformer[DatasetProfileView]): + """ + Transforms whylogs Dataset Profile Views to and from a Schema (typed/untyped) + """ + + _TYPE_INFO = BlobType(format="binary", dimensionality=BlobType.BlobDimensionality.SINGLE) + + def __init__(self): + super(WhylogsDatasetProfileTransformer, self).__init__("whylogs-profile-transformer", t=DatasetProfileView) + + def get_literal_type(self, t: Type[DatasetProfileView]) -> LiteralType: + return LiteralType(blob=self._TYPE_INFO) + + def to_literal( + self, + ctx: FlyteContext, + python_val: DatasetProfileView, + python_type: Type[DatasetProfileView], + expected: LiteralType, + ) -> Literal: + remote_path = ctx.file_access.get_random_remote_directory() + local_dir = ctx.file_access.get_random_local_path() + python_val.write(local_dir) + ctx.file_access.upload(local_dir, remote_path) + return Literal(scalar=Scalar(blob=Blob(uri=remote_path, metadata=BlobMetadata(type=self._TYPE_INFO)))) + + def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[DatasetProfileView]) -> T: + local_dir = ctx.file_access.get_random_local_path() + ctx.file_access.download(lv.scalar.blob.uri, local_dir) + return DatasetProfileView.read(local_dir) + + def to_html( + self, ctx: FlyteContext, python_val: DatasetProfileView, expected_python_type: Type[DatasetProfileView] + ) -> str: + pandas_profile = str(python_val.to_pandas().to_html()) + header = str("

Profile View

\n") + return header + pandas_profile + + +TypeEngine.register(WhylogsDatasetProfileTransformer()) diff --git a/plugins/flytekit-whylogs/requirements.in b/plugins/flytekit-whylogs/requirements.in new file mode 100644 index 0000000000..6784613a89 --- /dev/null +++ b/plugins/flytekit-whylogs/requirements.in @@ -0,0 +1,2 @@ +. +-e file:.#egg=flytekitplugins-whylogs diff --git a/plugins/flytekit-whylogs/requirements.txt b/plugins/flytekit-whylogs/requirements.txt new file mode 100644 index 0000000000..9001bc05e0 --- /dev/null +++ b/plugins/flytekit-whylogs/requirements.txt @@ -0,0 +1,26 @@ +# +# This file is autogenerated by pip-compile with python 3.9 +# To update, run: +# +# pip-compile requirements.in +# +-e file:.#egg=flytekitplugins-whylogs + # via -r requirements.in +flake8==4.0.1 + # via whylogs +mccabe==0.6.1 + # via flake8 +protobuf==3.20.1 + # via + # flytekitplugins-whylogs + # whylogs +pycodestyle==2.8.0 + # via flake8 +pyflakes==2.4.0 + # via flake8 +typing-extensions==4.3.0 + # via whylogs +whylogs==1.0.6 + # via flytekitplugins-whylogs +whylogs-sketching==3.4.1.dev2 + # via whylogs diff --git a/plugins/flytekit-whylogs/setup.py b/plugins/flytekit-whylogs/setup.py new file mode 100644 index 0000000000..8ce062a728 --- /dev/null +++ b/plugins/flytekit-whylogs/setup.py @@ -0,0 +1,36 @@ +from setuptools import setup + +PLUGIN_NAME = "whylogs" + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = ["protobuf>=3.15,<4.0.0", "whylogs", "whylogs[viz]"] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="whylabs", + author_email="support@whylabs.ai", + description="Enable the use of whylogs profiles to be used in flyte tasks to get aggregate statistics about data.", + url="https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-whylogs", + long_description=open("README.md").read(), + long_description_content_type="text/markdown", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.7", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.8", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) diff --git a/plugins/flytekit-whylogs/tests/__init__.py b/plugins/flytekit-whylogs/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/flytekit-whylogs/tests/test_renderer.py b/plugins/flytekit-whylogs/tests/test_renderer.py new file mode 100644 index 0000000000..2f51f34986 --- /dev/null +++ b/plugins/flytekit-whylogs/tests/test_renderer.py @@ -0,0 +1,73 @@ +from typing import Optional + +import numpy as np +import pandas as pd +import whylogs as why +from flytekitplugins.whylogs.renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer +from whylogs.core.constraints import ConstraintsBuilder, MetricConstraint, MetricsSelector + +import flytekit +from flytekit import task, workflow + + +@task +def make_data(n_rows: int) -> pd.DataFrame: + data = { + "sepal_length": np.random.random_sample(n_rows), + "sepal_width": np.random.random_sample(n_rows), + "petal_length": np.random.random_sample(n_rows), + "petal_width": np.random.random_sample(n_rows), + "species": np.random.choice(["virginica", "setosa", "versicolor"], n_rows), + "species_id": np.random.choice([1, 2, 3], n_rows), + } + return pd.DataFrame(data) + + +@task +def run_constraints(df: pd.DataFrame, min_value: Optional[float] = 0.0, max_value: Optional[float] = 4.0) -> bool: + # This API constraints workflow is very flexible but a bit cumbersome. + # It will be simplified in the future, so for now we'll stick with injecting + # a Constraints object to the renderer. + profile_view = why.log(df).view() + builder = ConstraintsBuilder(profile_view) + num_constraint = MetricConstraint( + name=f"numbers between {min_value} and {max_value} only", + condition=lambda x: x.min > min_value and x.max < max_value, + metric_selector=MetricsSelector(metric_name="distribution", column_name="sepal_length"), + ) + + builder.add_constraint(num_constraint) + constraints = builder.build() + + renderer = WhylogsConstraintsRenderer() + flytekit.Deck("constraints", renderer.to_html(constraints=constraints)) + + return constraints.validate() + + +@workflow +def whylogs_renderers_workflow(min_value: float, max_value: float) -> bool: + df = make_data(n_rows=10) + validated = run_constraints(df=df, min_value=min_value, max_value=max_value) + return validated + + +def test_constraints_passing(): + validated = whylogs_renderers_workflow(min_value=0.0, max_value=1.0) + assert validated is True + + +def test_constraints_failing(): + validated = whylogs_renderers_workflow(min_value=-1.0, max_value=0.0) + assert validated is False + + +def test_summary_drift_report_is_written(): + renderer = WhylogsSummaryDriftRenderer() + new_data = make_data(n_rows=10) + reference_data = make_data(n_rows=100) + + report = renderer.to_html(target_data=new_data, reference_data=reference_data) + assert report is not None + assert isinstance(report, str) + assert "Profile Summary" in report diff --git a/plugins/flytekit-whylogs/tests/test_schema.py b/plugins/flytekit-whylogs/tests/test_schema.py new file mode 100644 index 0000000000..8fffae1c75 --- /dev/null +++ b/plugins/flytekit-whylogs/tests/test_schema.py @@ -0,0 +1,42 @@ +from datetime import datetime + +import pandas as pd +import pytest +import whylogs as why +from whylogs.core import DatasetProfileView + +from flytekit import task, workflow + + +@pytest.fixture +def input_data(): + return pd.DataFrame({"a": [1, 2, 3, 4]}) + + +@task +def whylogs_profiling(data: pd.DataFrame) -> DatasetProfileView: + result = why.log(pandas=data) + return result.view() + + +@task +def fetch_whylogs_datetime(profile_view: DatasetProfileView) -> datetime: + return profile_view.dataset_timestamp + + +@workflow +def whylogs_wf(data: pd.DataFrame) -> datetime: + profile_view = whylogs_profiling(data=data) + return fetch_whylogs_datetime(profile_view=profile_view) + + +def test_task_returns_whylogs_profile_view(input_data): + actual_profile = whylogs_profiling(data=input_data) + assert actual_profile is not None + assert isinstance(actual_profile, DatasetProfileView) + + +def test_profile_view_gets_passed_on_tasks(input_data): + result = whylogs_wf(data=input_data) + assert result is not None + assert isinstance(result, datetime)