diff --git a/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py b/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py index c1c48d32b0..0aa38a555e 100644 --- a/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py +++ b/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py @@ -11,4 +11,4 @@ SodaCheckTask """ -from .task import SodaCheckConfig, SodaCheckTask \ No newline at end of file +from .task import SodaCheckConfig, SodaCheckTask diff --git a/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py b/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py index a2f27090db..bd593ca9e8 100644 --- a/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py +++ b/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py @@ -6,13 +6,17 @@ This plugin allows setting various parameters like scan definition files, data sources, and Soda Cloud API credentials to run these scans in an automated fashion within Flyte. """ + +import os +import subprocess from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional + +import requests + from flytekit import PythonFunctionTask -from typing import Any, Dict, Callable, Optional from flytekit.configuration import SecretsManager -import requests -import os -import subprocess + # This would be the main task configuration class for Soda.io @dataclass @@ -26,11 +30,13 @@ class SodaCheckConfig: data_source (Optional[str]): Name of the data source in Soda.io to use for the scan. scan_name (Optional[str]): Name of the scan job for organizational purposes. """ + scan_definition: str soda_cloud_api_key: Optional[str] = None data_source: Optional[str] = None # Name of the data source in Soda.io scan_name: Optional[str] = "default_scan" # Name for the scan job + class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]): """ A Flyte task that runs a Soda.io data quality scan as defined in the provided configuration. @@ -42,6 +48,7 @@ class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]): Attributes: _TASK_TYPE (str): The task type identifier for Soda.io checks within Flyte. """ + _TASK_TYPE = "soda_check_task" def __init__(self, task_config: SodaCheckConfig, task_function: Callable, **kwargs): @@ -90,9 +97,9 @@ def execute(self, **kwargs) -> Dict[str, Any]: "scan_definition": scan_definition, "data_source": data_source, "scan_name": scan_name, - "api_key": api_key + "api_key": api_key, } - + # Placeholder for API result result = {} @@ -100,11 +107,11 @@ def execute(self, **kwargs) -> Dict[str, Any]: try: response = requests.post(url, json=payload) response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) - + # Assuming the API returns a JSON response result = response.json() except requests.exceptions.RequestException as e: raise RuntimeError(f"API call failed: {e}") - return {"scan_result": result} \ No newline at end of file + return {"scan_result": result} diff --git a/plugins/community/flytekit-soda.io/setup.py b/plugins/community/flytekit-soda.io/setup.py index 094d1d594f..c36b66c94b 100644 --- a/plugins/community/flytekit-soda.io/setup.py +++ b/plugins/community/flytekit-soda.io/setup.py @@ -2,7 +2,11 @@ PLUGIN_NAME = "kfsoda" microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.6.1", "soda-spark", "requests>=2.25.1"] # Update as per Soda.io requirements +plugin_requires = [ + "flytekit>=1.6.1", + "soda-spark", + "requests>=2.25.1", +] # Update as per Soda.io requirements __version__ = "0.0.0+develop" setup( @@ -29,4 +33,4 @@ "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", ], -) \ No newline at end of file +) diff --git a/plugins/community/flytekit-soda.io/tests/test_soda_task.py b/plugins/community/flytekit-soda.io/tests/test_soda_task.py index a2a36c9fe1..c3365e49e8 100644 --- a/plugins/community/flytekit-soda.io/tests/test_soda_task.py +++ b/plugins/community/flytekit-soda.io/tests/test_soda_task.py @@ -1,10 +1,11 @@ import unittest -from unittest.mock import patch, MagicMock -from typing import NamedTuple, Dict +from typing import Dict, NamedTuple +from unittest.mock import MagicMock, patch -from flytekit import task, workflow from flytekitplugins.soda import SodaTask +from flytekit import task, workflow + # Define a NamedTuple to represent the expected output from the SodaTask SodaTaskOutput = NamedTuple("SodaTaskOutput", [("scan_result", Dict[str, any])]) @@ -13,7 +14,10 @@ MOCK_DATA_SOURCE = "mock_data_source" MOCK_SCAN_NAME = "mock_scan_name" MOCK_API_KEY = "mock_api_key" -MOCK_RESPONSE = {"scan_result": {"status": "success", "findings": []}} # Example response structure +MOCK_RESPONSE = { + "scan_result": {"status": "success", "findings": []} +} # Example response structure + # Define a Flyte task to initialize the SodaTask and execute it @task @@ -23,17 +27,19 @@ def setup_soda_task() -> SodaTaskOutput: scan_definition=MOCK_SCAN_DEFINITION, data_source=MOCK_DATA_SOURCE, scan_name=MOCK_SCAN_NAME, - soda_cloud_api_key=MOCK_API_KEY + soda_cloud_api_key=MOCK_API_KEY, ) # Execute the task and return the mock response return soda_task.execute() + # Define a Flyte workflow to test the setup task @workflow def test_soda_workflow() -> SodaTaskOutput: return setup_soda_task() + # Define the test class for the SodaTask plugin class TestSodaTask(unittest.TestCase): @patch("requests.post") @@ -55,9 +61,10 @@ def test_soda_task_execution(self, mock_post): "scan_definition": MOCK_SCAN_DEFINITION, "data_source": MOCK_DATA_SOURCE, "scan_name": MOCK_SCAN_NAME, - "api_key": MOCK_API_KEY - } + "api_key": MOCK_API_KEY, + }, ) + if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main()