From 187d8e707aa305ed3c6947f9f014ee6cb581e873 Mon Sep 17 00:00:00 2001 From: Amber-Rigg <44523299+Amber-Rigg@users.noreply.github.com> Date: Fri, 19 Apr 2024 15:28:43 +0100 Subject: [PATCH] Data Quality with Great Expectations (#714) * Great expectations Code Signed-off-by: Amber-Rigg * Refactor GreatExpectations class and remove unused code Signed-off-by: Amber-Rigg * Add new Python utility module and update GreatExpectations class Signed-off-by: Amber-Rigg * Update to develop branch Signed-off-by: Amber-Rigg * Add monitoring module with license headers structure update Signed-off-by: Amber-Rigg * Add data quality monitoring documentation with Great Expectations and PyDeequ Signed-off-by: Amber-Rigg * Add data quality monitoring examples to mkdocs Signed-off-by: Amber-Rigg * Add tests for data quality components Signed-off-by: Amber-Rigg * Update env with pydeequ version Signed-off-by: Amber-Rigg * PyDeequ Improved Doucmentation and Naming Signed-off-by: Amber-Rigg * Add data quality monitoring using Great Expectations Signed-off-by: Amber-Rigg * Remove deprecated PythonDeequPipeline class and its dependencies Signed-off-by: Amber-Rigg * Mocker for Great Expectations Signed-off-by: Amber-Rigg * Add init files Signed-off-by: Amber-Rigg * add test init Signed-off-by: Amber-Rigg * Update naming for test discovery Signed-off-by: Amber-Rigg * Update data quality init Signed-off-by: Amber-Rigg * Pytest discovery Signed-off-by: Amber-Rigg * Ingest with Maven library installation Signed-off-by: Amber-Rigg * tests for great expectations Signed-off-by: Amber-Rigg * Refactor build_expectations method signature Signed-off-by: Amber-Rigg * add maven package Signed-off-by: Amber-Rigg * Update Structure for monitoring and add examples to documentation Signed-off-by: Amber-Rigg * update sparksession Signed-off-by: Amber-Rigg * correct referencing in new file structure Signed-off-by: Amber-Rigg * Update Init Signed-off-by: Amber-Rigg * Add monitoring and spark modules for SDK Signed-off-by: Amber-Rigg * Init for monitoring Signed-off-by: Amber-Rigg * Update dependencies and add data quality monitoring Signed-off-by: Amber-Rigg * Test env for spark Signed-off-by: Amber-Rigg * Removal of Pydeequ Component from branch Signed-off-by: Amber-Rigg * Init update Signed-off-by: Amber-Rigg * Great Expectations and Databricks Integration Signed-off-by: Amber-Rigg * Update Inteface and testings Signed-off-by: Amber-Rigg --------- Signed-off-by: Amber-Rigg --- .../spark/data_quality/great_expectations.md | 5 + environment.yml | 1 + mkdocs.yml | 4 + .../pipelines/monitoring/__init__.py | 14 ++ .../pipelines/monitoring/interfaces.py | 20 ++ .../pipelines/monitoring/spark/__init__.py | 13 + .../monitoring/spark/data_quality/__init__.py | 13 + .../great_expectations_data_quality.py | 238 ++++++++++++++++++ .../pipelines/monitoring/__init__ .py | 13 + .../pipelines/monitoring/spark/__init__.py | 13 + .../monitoring/spark/data_quality/__init__.py | 13 + .../test_great_expectations_data_quality.py | 127 ++++++++++ 12 files changed, 474 insertions(+) create mode 100644 docs/sdk/code-reference/pipelines/monitoring/spark/data_quality/great_expectations.md create mode 100644 src/sdk/python/rtdip_sdk/pipelines/monitoring/__init__.py create mode 100644 src/sdk/python/rtdip_sdk/pipelines/monitoring/interfaces.py create mode 100644 src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py create mode 100644 src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py create mode 100644 src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/great_expectations_data_quality.py create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/monitoring/__init__ .py create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_great_expectations_data_quality.py diff --git a/docs/sdk/code-reference/pipelines/monitoring/spark/data_quality/great_expectations.md b/docs/sdk/code-reference/pipelines/monitoring/spark/data_quality/great_expectations.md new file mode 100644 index 000000000..8f26a67bf --- /dev/null +++ b/docs/sdk/code-reference/pipelines/monitoring/spark/data_quality/great_expectations.md @@ -0,0 +1,5 @@ +# Examine Data Quality with Great Expectations + +Great Expectations is a Python-based open-source library for validating, documenting, and profiling your data. It helps you to maintain data quality and improve communication about data between teams. + +::: src.sdk.python.rtdip_sdk.pipelines.monitoring.spark.data_quality.great_expectations_data_quality \ No newline at end of file diff --git a/environment.yml b/environment.yml index 748d9f3fb..48b8041de 100644 --- a/environment.yml +++ b/environment.yml @@ -68,6 +68,7 @@ dependencies: - netCDF4==1.6.4 - black==24.1.0 - joblib==1.3.2 + - great-expectations==0.18.8 - pip: - databricks-sdk==0.20.0 - dependency-injector==4.41.0 diff --git a/mkdocs.yml b/mkdocs.yml index 8a2589e22..d66a88dc3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -227,6 +227,10 @@ nav: - Azure Key Vault: sdk/code-reference/pipelines/secrets/azure_key_vault.md - Deploy: - Databricks: sdk/code-reference/pipelines/deploy/databricks.md + - Monitoring: + - Data Quality: + - Great Expectations: + - Data Quality Monitoring: sdk/code-reference/pipelines/monitoring/spark/data_quality/great_expectations.md - Jobs: sdk/pipelines/jobs.md - Deploy: - Databricks Workflows: sdk/pipelines/deploy/databricks.md diff --git a/src/sdk/python/rtdip_sdk/pipelines/monitoring/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/monitoring/__init__.py new file mode 100644 index 000000000..17e525274 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/monitoring/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from .spark.data_quality.great_expectations_data_quality import * diff --git a/src/sdk/python/rtdip_sdk/pipelines/monitoring/interfaces.py b/src/sdk/python/rtdip_sdk/pipelines/monitoring/interfaces.py new file mode 100644 index 000000000..2c446c5bc --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/monitoring/interfaces.py @@ -0,0 +1,20 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import abstractmethod +from ..interfaces import PipelineComponentBaseInterface + + +class MonitoringBaseInterface(PipelineComponentBaseInterface): + pass diff --git a/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py new file mode 100644 index 000000000..5305a429e --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py new file mode 100644 index 000000000..5305a429e --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/great_expectations_data_quality.py b/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/great_expectations_data_quality.py new file mode 100644 index 000000000..f8022e41c --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/great_expectations_data_quality.py @@ -0,0 +1,238 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import great_expectations as gx +from pyspark.sql import DataFrame, SparkSession +from ...interfaces import MonitoringBaseInterface +from ...._pipeline_utils.models import Libraries, SystemType +from great_expectations.checkpoint import ( + Checkpoint, +) +from great_expectations.expectations.expectation import ( + ExpectationConfiguration, +) + + +# Create a new context +class GreatExpectationsDataQuality(MonitoringBaseInterface): + """ + Data Quality Monitoring using Great Expectations allowing you to create and check your data quality expectations. + + Example + -------- + ```python + from src.sdk.python.rtdip_sdk.monitoring.data_quality.great_expectations.python.great_expectations_data_quality import GreatExpectationsDataQuality + from rtdip_sdk.pipelines.utilities import SparkSessionUtility + import json + + # Not required if using Databricks + spark = SparkSessionUtility(config={}).execute() + + df = spark_dataframe + context_root_dir = "/dbfs/great_expectations/", + expectation_suite_name = "great_expectations_suite_name" + df_datasource_name = "my_spark_in_memory_datasource", + df_asset_name = "df_asset_name", + + + expectation_type = "expect_column_values_to_not_be_null" + exception_dict = { + "column": "column_name", + "mostly": 0.75, + } + meta_dict = { + "notes": { + "format": "markdown", + "content": "Comment about this expectation.", + } + } + + #Configure the Great Expectations Data Quality + + GX = GreatExpectationsDataQuality(spark, context_root_dir, df, expectation_suite_name, df_datasource_name, df_asset_name) + + validator, suite = GX.create_expectations() + + expectation_configuration = GX.build_expectations( + exception_type, exception_dict, meta_dict + ) + + GX.add_expectations(suite, expectation_configuration) + + GX.save_expectations(validator) + + GX.display_expectations(suite) + + #Run the Data Quality Check by Validating your data against set expecations in the suite + + checkpoint_name = "checkpoint_name" + run_name_template = "run_name_template" + action_list = [ + { + "name": "store_validation_result", + "action": {"class_name": "StoreValidationResultAction"}, + }, + {"name": "update_data_docs", "action": {"class_name": "UpdateDataDocsAction"}}, + ] + + checkpoint_result = GX.check(checkpoint_name, run_name_template, action_list) + + print(checkpoint_result) + + ``` + + Parameters: + df (DataFrame): Dataframe containing the raw data. + context_root_dir (str): The root directory of the Great Expectations project. + expectation_suite_name (str): The name of the expectation suite to be created. + df_datasource_name (str): The name of the datasource. + df_asset_name (str): The name of the asset. + """ + + def __init__( + self, + spark: SparkSession, + context_root_dir: str, + df: DataFrame, + expectation_suite_name: str, + df_datasource_name: str = "my_spark_in_memory_datasource", + df_asset_name: str = "df_asset_name", + ) -> None: + self.spark = spark + self.context_root_dir = context_root_dir + self.df = df + self.expectation_suite_name = expectation_suite_name + self.df_datasource_name = df_datasource_name + self.df_asset_name = df_asset_name + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + # Create a new context + def _create_context(self): + """ + Create a new context + Returns: context + """ + context = gx.get_context(context_root_dir=self.context_root_dir) + return context + + # Create a batch request from a dataframe + def _create_batch_request(self): + """ + Create a batch request from a dataframe + Returns: batch_request + """ + context = self._create_context() + + dataframe_datasource = context.sources.add_or_update_spark( + name=self.df_datasource_name, + ) + dataframe_asset = dataframe_datasource.add_dataframe_asset( + name=self.df_asset_name, + dataframe=self.df, + ) + + batch_request = (dataframe_asset).build_batch_request() + return batch_request + + # Create Expectations + + def create_expectations(self): + context = self._create_context() + batch_request = self._create_batch_request() + + suite = context.add_or_update_expectation_suite( + expectation_suite_name=self.expectation_suite_name + ) + validator = context.get_validator( + batch_request=batch_request, + expectation_suite_name=self.expectation_suite_name, + ) + return validator, suite + + def build_expectations( + self, exception_type: str, exception_dict: dict, meta_dict: dict + ): + expectation_configuration = ExpectationConfiguration( + expectation_type=exception_type, kwargs=exception_dict, meta=meta_dict + ) + return expectation_configuration + + def add_expectations(self, suite, expectation_configuration): + suite.add_expectation_configuration( + expectation_configuration=expectation_configuration + ) + + def remove_expectations( + self, suite, expectation_configuration, remove_multiple_matches=True + ): + suite.remove_expectation( + expectation_configuration=expectation_configuration, + match_type="domain", + remove_multiple_matches=remove_multiple_matches, + ) + + def display_expectations(self, suite): + expectation = suite.show_expectations_by_expectation_type() + return expectation + + def save_expectations(self, validator): + validator.save_expectation_suite(discard_failed_expectations=False) + return validator + + # Validate your data + + def check( + self, + checkpoint_name: str, + run_name_template: str, + action_list: list, + ): + """ + Validate your data against set expecations in the suite + Args: + checkpoint_name (str): The name of the checkpoint. + run_name_template (str): The name of the run. + action_list (list): The list of actions to be performed. + Returns: checkpoint_result(dict) + """ + context = self._create_context() + batch_request = self._create_batch_request() + + checkpoint = Checkpoint( + name=checkpoint_name, + run_name_template=run_name_template, + data_context=context, + batch_request=batch_request, + expectation_suite_name=self.expectation_suite_name, + action_list=action_list, + ) + context.add_or_update_checkpoint(checkpoint=checkpoint) + checkpoint_result = checkpoint.run() + return checkpoint_result diff --git a/tests/sdk/python/rtdip_sdk/pipelines/monitoring/__init__ .py b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/__init__ .py new file mode 100644 index 000000000..5305a429e --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/__init__ .py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py new file mode 100644 index 000000000..5305a429e --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py new file mode 100644 index 000000000..5305a429e --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_great_expectations_data_quality.py b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_great_expectations_data_quality.py new file mode 100644 index 000000000..69218c439 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/monitoring/spark/data_quality/test_great_expectations_data_quality.py @@ -0,0 +1,127 @@ +import pytest +from pytest_mock import MockerFixture +from pyspark.sql import SparkSession, DataFrame + +from src.sdk.python.rtdip_sdk.pipelines.monitoring.spark.data_quality.great_expectations_data_quality import ( + GreatExpectationsDataQuality, +) + +gx = GreatExpectationsDataQuality( + spark=SparkSession, + context_root_dir="context_root_dir/test/", + df="test_df", + expectation_suite_name="expectation_suite_name", + df_datasource_name="my_spark_in_memory_datasource", + df_asset_name="df_asset_name", +) + + +def test_create_expectations(mocker: MockerFixture): + mock_context = mocker.MagicMock() + gx._create_context = mocker.MagicMock(return_value=mock_context) + mock_batch_request = mocker.MagicMock() + gx._create_batch_request = mocker.MagicMock(return_value=mock_batch_request) + mock_suite = mocker.MagicMock() + mock_context.add_or_update_expectation_suite.return_value = mock_suite + mock_validator = mocker.MagicMock() + mock_context.get_validator.return_value = mock_validator + + validator, suite = gx.create_expectations() + + gx._create_context.assert_called_once() + gx._create_batch_request.assert_called_once() + mock_context.add_or_update_expectation_suite.assert_called_once_with( + expectation_suite_name=gx.expectation_suite_name + ) + mock_context.get_validator.assert_called_once_with( + batch_request=mock_batch_request, + expectation_suite_name=gx.expectation_suite_name, + ) + assert validator == mock_validator + assert suite == mock_suite + + +def test_build_expectations(): + + expectation_type = "expect_column_values_to_not_be_null" + exception_dict = { + "column": "user_id", + "mostly": 0.75, + } + meta_dict = { + "notes": { + "format": "markdown", + "content": "Some clever comment about this expectation. **Markdown** `Supported`", + } + } + expectation_configuration = gx.build_expectations( + expectation_type, exception_dict, meta_dict + ) + assert ( + expectation_configuration.expectation_type + == "expect_column_values_to_not_be_null" + ) + assert expectation_configuration.kwargs == { + "column": "user_id", + "mostly": 0.75, + } + assert expectation_configuration.meta == { + "notes": { + "format": "markdown", + "content": "Some clever comment about this expectation. **Markdown** `Supported`", + } + } + assert isinstance(expectation_type, str) + assert isinstance(exception_dict, dict) + assert isinstance(meta_dict, dict) + + +def test_add_expectations(mocker: MockerFixture): + mock_suite = mocker.MagicMock() + mock_expectation_configuration = mocker.MagicMock() + gx.add_expectations(mock_suite, mock_expectation_configuration) + mock_suite.add_expectation_configuration.assert_called_once_with( + expectation_configuration=mock_expectation_configuration + ) + + +def test_remove_expectations(mocker: MockerFixture): + mock_suite = mocker.MagicMock() + mock_expectation_configuration = mocker.MagicMock() + gx.remove_expectations(mock_suite, mock_expectation_configuration) + mock_suite.remove_expectation.assert_called_once_with( + expectation_configuration=mock_expectation_configuration, + match_type="domain", + remove_multiple_matches=True, + ) + + +def test_display_expectations(mocker: MockerFixture): + mock_suite = mocker.MagicMock() + mock_suite.show_expectations_by_expectation_type.return_value = "expectation" + expectation = gx.display_expectations(mock_suite) + assert expectation == "expectation" + + +def test_save_expectations(mocker: MockerFixture): + mock_validator = mocker.MagicMock() + validator = gx.save_expectations(mock_validator) + mock_validator.save_expectation_suite.assert_called_once_with( + discard_failed_expectations=False + ) + assert validator == mock_validator + + +def test_check(mocker: MockerFixture): + checkpoint_name = "checkpoint_name" + run_name_template = "run_name_template" + action_list = ["action_list"] + + mock_context = mocker.MagicMock() + gx._create_context = mocker.MagicMock(return_value=mock_context) + mock_batch_request = mocker.MagicMock() + gx._create_batch_request = mocker.MagicMock(return_value=mock_batch_request) + + assert isinstance(checkpoint_name, str) + assert isinstance(run_name_template, str) + assert isinstance(action_list, list)