Skip to content

Commit

Permalink
Introducing whylogs integration to flytekit (#1104)
Browse files Browse the repository at this point in the history
* Introducing whylogs integration to flytekit

* Details: It creates a schema for "transforming" whylogs' DatasetProfileView objects to and from the FlyteSchema. This PR also creates two renderers with FlyteDeck, one for the Constraints report and the other for the SummaryDriftReport.

Signed-off-by: murilommen <[email protected]>

* refactoring tests

changes: breaking down single test_schema.py unit test into test_schema + test_renderer, each with their own concerns
Signed-off-by: murilommen <[email protected]>

* styleguide fixes

details: removing extra lines later caught by flake8
Signed-off-by: murilommen <[email protected]>

* add plugin to ci checks

Signed-off-by: murilommen <[email protected]>

* adding requirements files + lint fixes

Signed-off-by: murilommen <[email protected]>

* removing entrypoints from setup.py

Signed-off-by: murilommen <[email protected]>

* adding a python 3.10 build restriction

Signed-off-by: murilommen <[email protected]>

* fixing protobuf's version to < 4

details: whylogs require protobuf > 3.15, but flyteidl can't handle protobuf < 4.0, so adding it as a restriction to setup.py
Signed-off-by: murilommen <[email protected]>

* lint setup.py

details: ran black command on the plugin implementation and caught something with black
Signed-off-by: murilommen <[email protected]>

* fixing comma on setup.py

Signed-off-by: murilommen <[email protected]>
  • Loading branch information
murilommen authored and wild-endeavor committed Aug 2, 2022
1 parent 535c11f commit c984814
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jobs:
- flytekit-snowflake
- flytekit-spark
- flytekit-sqlalchemy
- flytekit-whylogs
exclude:
# flytekit-modin depends on ray which does not have a 3.10 wheel yet.
# Issue tracked in https://github.com/ray-project/ray/issues/19116.
Expand All @@ -103,6 +104,12 @@ jobs:
plugin-names: "flytekit-onnx-scikitlearn"
- python-version: 3.10
plugin-names: "flytekit-onnx-tensorflow"
# whylogs-sketching library does not have a 3.10 build yet
# Issue tracked: https://github.com/whylabs/whylogs/issues/697
- python-version: 3.10
plugin-names: "flytekit-whylogs"


steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
57 changes: 57 additions & 0 deletions plugins/flytekit-whylogs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Flytekit whylogs Plugin

whylogs is an open source library for logging any kind of data. With whylogs,
you are able to generate summaries of datasets (called whylogs profiles) which
can be used to:

- Create data constraints to know whether your data looks the way it should
- Quickly visualize key summary statistics about a dataset
- Track changes in a dataset over time

```bash
pip install flytekitplugins-whylogs
```

To generate profiles, you can add a task like the following:

```python
from whylogs.core import DatasetProfileView
import whylogs as ylog

import pandas as pd

@task
def profile(df: pd.DataFrame) -> DatasetProfileView:
result = ylog.log(df) # Various overloads for different common data types exist
profile = result.view()
return profile
```

>**NOTE:** You'll be passing around `DatasetProfileView` from tasks, not `DatasetProfile`.
## Validating Data

A common step in data pipelines is data validation. This can be done in
`whylogs` through the constraint feature. You'll be able to create failure tasks
if the data in the workflow doesn't conform to some configured constraints, like
min/max values on features, data types on features, etc.

```python
@task
def validate_data(profile: DatasetProfileView):
column = profile.get_column("my_column")
print(column.to_summary_dict()) # To see available things you can validate against
builder = ConstraintsBuilder(profile)
numConstraint = MetricConstraint(
name='numbers between 0 and 4 only',
condition=lambda x: x.min > 0 and x.max < 4,
metric_selector=MetricsSelector(metric_name='distribution', column_name='my_column'))
builder.add_constraint(numConstraint)
constraint = builder.build()
valid = constraint.validate()

if(not valid):
raise Exception("Invalid data found")
```

Check out our [constraints notebook](https://github.com/whylabs/whylogs/blob/1.0.x/python/examples/basic/MetricConstraints.ipynb) for more examples.
4 changes: 4 additions & 0 deletions plugins/flytekit-whylogs/flytekitplugins/whylogs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer
from .schema import WhylogsDatasetProfileTransformer

__all__ = ["WhylogsDatasetProfileTransformer", "WhylogsConstraintsRenderer", "WhylogsSummaryDriftRenderer"]
65 changes: 65 additions & 0 deletions plugins/flytekit-whylogs/flytekitplugins/whylogs/renderer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import whylogs as why
from pandas import DataFrame
from whylogs.core.constraints import Constraints
from whylogs.viz import NotebookProfileVisualizer


class WhylogsSummaryDriftRenderer:
"""
Creates a whylogs' Summary Drift report from two pandas DataFrames. One of them
is the reference and the other one is the target data, meaning that this is what
the report will compare it against.
"""

@staticmethod
def to_html(reference_data: DataFrame, target_data: DataFrame) -> str:
"""
This static method will profile the input data and then generate an HTML report
with the Summary Drift calculations for all the dataframe's columns
:param reference_data: The DataFrame that will be the reference for the drift report
:type: pandas.DataFrame
:param target_data: The data to compare against and create the Summary Drift report
:type target_data: pandas.DataFrame
"""

target_view = why.log(target_data).view()
reference_view = why.log(reference_data).view()
viz = NotebookProfileVisualizer()
viz.set_profiles(target_profile_view=target_view, reference_profile_view=reference_view)
return viz.summary_drift_report().data


class WhylogsConstraintsRenderer:
"""
Creates a whylogs' Constraints report from a `Constraints` object. Currently our API
requires the user to have a profiled DataFrame in place to be able to use it. Then the report
will render a nice HTML that will let users check which constraints passed or failed their
logic. An example constraints object definition can be written as follows:
.. code-block:: python
profile_view = why.log(df).view()
builder = ConstraintsBuilder(profile_view)
num_constraint = MetricConstraint(
name=f'numbers between {min_value} and {max_value} only',
condition=lambda x: x.min > min_value and x.max < max_value,
metric_selector=MetricsSelector(
metric_name='distribution',
column_name='sepal_length'
)
)
builder.add_constraint(num_constraint)
constraints = builder.build()
Each Constraints object (builder.build() in the former example) can have as many constraints as
desired. If you want to learn more, check out our docs and examples at https://whylogs.readthedocs.io/
"""

@staticmethod
def to_html(constraints: Constraints) -> str:
viz = NotebookProfileVisualizer()
report = viz.constraints_report(constraints=constraints)
return report.data
50 changes: 50 additions & 0 deletions plugins/flytekit-whylogs/flytekitplugins/whylogs/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Type

from whylogs.core import DatasetProfileView

from flytekit import BlobType, FlyteContext
from flytekit.extend import T, TypeEngine, TypeTransformer
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.types import LiteralType


class WhylogsDatasetProfileTransformer(TypeTransformer[DatasetProfileView]):
"""
Transforms whylogs Dataset Profile Views to and from a Schema (typed/untyped)
"""

_TYPE_INFO = BlobType(format="binary", dimensionality=BlobType.BlobDimensionality.SINGLE)

def __init__(self):
super(WhylogsDatasetProfileTransformer, self).__init__("whylogs-profile-transformer", t=DatasetProfileView)

def get_literal_type(self, t: Type[DatasetProfileView]) -> LiteralType:
return LiteralType(blob=self._TYPE_INFO)

def to_literal(
self,
ctx: FlyteContext,
python_val: DatasetProfileView,
python_type: Type[DatasetProfileView],
expected: LiteralType,
) -> Literal:
remote_path = ctx.file_access.get_random_remote_directory()
local_dir = ctx.file_access.get_random_local_path()
python_val.write(local_dir)
ctx.file_access.upload(local_dir, remote_path)
return Literal(scalar=Scalar(blob=Blob(uri=remote_path, metadata=BlobMetadata(type=self._TYPE_INFO))))

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[DatasetProfileView]) -> T:
local_dir = ctx.file_access.get_random_local_path()
ctx.file_access.download(lv.scalar.blob.uri, local_dir)
return DatasetProfileView.read(local_dir)

def to_html(
self, ctx: FlyteContext, python_val: DatasetProfileView, expected_python_type: Type[DatasetProfileView]
) -> str:
pandas_profile = str(python_val.to_pandas().to_html())
header = str("<h1>Profile View</h1> \n")
return header + pandas_profile


TypeEngine.register(WhylogsDatasetProfileTransformer())
2 changes: 2 additions & 0 deletions plugins/flytekit-whylogs/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.
-e file:.#egg=flytekitplugins-whylogs
26 changes: 26 additions & 0 deletions plugins/flytekit-whylogs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile requirements.in
#
-e file:.#egg=flytekitplugins-whylogs
# via -r requirements.in
flake8==4.0.1
# via whylogs
mccabe==0.6.1
# via flake8
protobuf==3.20.1
# via
# flytekitplugins-whylogs
# whylogs
pycodestyle==2.8.0
# via flake8
pyflakes==2.4.0
# via flake8
typing-extensions==4.3.0
# via whylogs
whylogs==1.0.6
# via flytekitplugins-whylogs
whylogs-sketching==3.4.1.dev2
# via whylogs
36 changes: 36 additions & 0 deletions plugins/flytekit-whylogs/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from setuptools import setup

PLUGIN_NAME = "whylogs"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["protobuf>=3.15,<4.0.0", "whylogs", "whylogs[viz]"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="whylabs",
author_email="[email protected]",
description="Enable the use of whylogs profiles to be used in flyte tasks to get aggregate statistics about data.",
url="https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-whylogs",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.7",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.8",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
73 changes: 73 additions & 0 deletions plugins/flytekit-whylogs/tests/test_renderer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Optional

import numpy as np
import pandas as pd
import whylogs as why
from flytekitplugins.whylogs.renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer
from whylogs.core.constraints import ConstraintsBuilder, MetricConstraint, MetricsSelector

import flytekit
from flytekit import task, workflow


@task
def make_data(n_rows: int) -> pd.DataFrame:
data = {
"sepal_length": np.random.random_sample(n_rows),
"sepal_width": np.random.random_sample(n_rows),
"petal_length": np.random.random_sample(n_rows),
"petal_width": np.random.random_sample(n_rows),
"species": np.random.choice(["virginica", "setosa", "versicolor"], n_rows),
"species_id": np.random.choice([1, 2, 3], n_rows),
}
return pd.DataFrame(data)


@task
def run_constraints(df: pd.DataFrame, min_value: Optional[float] = 0.0, max_value: Optional[float] = 4.0) -> bool:
# This API constraints workflow is very flexible but a bit cumbersome.
# It will be simplified in the future, so for now we'll stick with injecting
# a Constraints object to the renderer.
profile_view = why.log(df).view()
builder = ConstraintsBuilder(profile_view)
num_constraint = MetricConstraint(
name=f"numbers between {min_value} and {max_value} only",
condition=lambda x: x.min > min_value and x.max < max_value,
metric_selector=MetricsSelector(metric_name="distribution", column_name="sepal_length"),
)

builder.add_constraint(num_constraint)
constraints = builder.build()

renderer = WhylogsConstraintsRenderer()
flytekit.Deck("constraints", renderer.to_html(constraints=constraints))

return constraints.validate()


@workflow
def whylogs_renderers_workflow(min_value: float, max_value: float) -> bool:
df = make_data(n_rows=10)
validated = run_constraints(df=df, min_value=min_value, max_value=max_value)
return validated


def test_constraints_passing():
validated = whylogs_renderers_workflow(min_value=0.0, max_value=1.0)
assert validated is True


def test_constraints_failing():
validated = whylogs_renderers_workflow(min_value=-1.0, max_value=0.0)
assert validated is False


def test_summary_drift_report_is_written():
renderer = WhylogsSummaryDriftRenderer()
new_data = make_data(n_rows=10)
reference_data = make_data(n_rows=100)

report = renderer.to_html(target_data=new_data, reference_data=reference_data)
assert report is not None
assert isinstance(report, str)
assert "Profile Summary" in report
42 changes: 42 additions & 0 deletions plugins/flytekit-whylogs/tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from datetime import datetime

import pandas as pd
import pytest
import whylogs as why
from whylogs.core import DatasetProfileView

from flytekit import task, workflow


@pytest.fixture
def input_data():
return pd.DataFrame({"a": [1, 2, 3, 4]})


@task
def whylogs_profiling(data: pd.DataFrame) -> DatasetProfileView:
result = why.log(pandas=data)
return result.view()


@task
def fetch_whylogs_datetime(profile_view: DatasetProfileView) -> datetime:
return profile_view.dataset_timestamp


@workflow
def whylogs_wf(data: pd.DataFrame) -> datetime:
profile_view = whylogs_profiling(data=data)
return fetch_whylogs_datetime(profile_view=profile_view)


def test_task_returns_whylogs_profile_view(input_data):
actual_profile = whylogs_profiling(data=input_data)
assert actual_profile is not None
assert isinstance(actual_profile, DatasetProfileView)


def test_profile_view_gets_passed_on_tasks(input_data):
result = whylogs_wf(data=input_data)
assert result is not None
assert isinstance(result, datetime)

0 comments on commit c984814

Please sign in to comment.