Skip to content

Commit

Permalink
Linting issues corrected
Browse files Browse the repository at this point in the history
Signed-off-by: 10sharmashivam <[email protected]>
  • Loading branch information
10sharmashivam committed Nov 12, 2024
1 parent d56a438 commit f204dc6
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
SodaCheckTask
"""

from .task import SodaCheckConfig, SodaCheckTask
from .task import SodaCheckConfig, SodaCheckTask
23 changes: 15 additions & 8 deletions plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
This plugin allows setting various parameters like scan definition files, data sources, and Soda Cloud
API credentials to run these scans in an automated fashion within Flyte.
"""

import os
import subprocess
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional

import requests

from flytekit import PythonFunctionTask
from typing import Any, Dict, Callable, Optional
from flytekit.configuration import SecretsManager
import requests
import os
import subprocess


# This would be the main task configuration class for Soda.io
@dataclass
Expand All @@ -26,11 +30,13 @@ class SodaCheckConfig:
data_source (Optional[str]): Name of the data source in Soda.io to use for the scan.
scan_name (Optional[str]): Name of the scan job for organizational purposes.
"""

scan_definition: str
soda_cloud_api_key: Optional[str] = None
data_source: Optional[str] = None # Name of the data source in Soda.io
scan_name: Optional[str] = "default_scan" # Name for the scan job


class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]):
"""
A Flyte task that runs a Soda.io data quality scan as defined in the provided configuration.
Expand All @@ -42,6 +48,7 @@ class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]):
Attributes:
_TASK_TYPE (str): The task type identifier for Soda.io checks within Flyte.
"""

_TASK_TYPE = "soda_check_task"

def __init__(self, task_config: SodaCheckConfig, task_function: Callable, **kwargs):
Expand Down Expand Up @@ -90,21 +97,21 @@ def execute(self, **kwargs) -> Dict[str, Any]:
"scan_definition": scan_definition,
"data_source": data_source,
"scan_name": scan_name,
"api_key": api_key
"api_key": api_key,
}

# Placeholder for API result
result = {}

# Make the API call (using POST method as an example)
try:
response = requests.post(url, json=payload)
response.raise_for_status() # Raise an error for bad responses (4xx or 5xx)

# Assuming the API returns a JSON response
result = response.json()

except requests.exceptions.RequestException as e:
raise RuntimeError(f"API call failed: {e}")

return {"scan_result": result}
return {"scan_result": result}
8 changes: 6 additions & 2 deletions plugins/community/flytekit-soda.io/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

PLUGIN_NAME = "kfsoda"
microlib_name = f"flytekitplugins-{PLUGIN_NAME}"
plugin_requires = ["flytekit>=1.6.1", "soda-spark", "requests>=2.25.1"] # Update as per Soda.io requirements
plugin_requires = [
"flytekit>=1.6.1",
"soda-spark",
"requests>=2.25.1",
] # Update as per Soda.io requirements
__version__ = "0.0.0+develop"

setup(
Expand All @@ -29,4 +33,4 @@
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
)
23 changes: 15 additions & 8 deletions plugins/community/flytekit-soda.io/tests/test_soda_task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import unittest
from unittest.mock import patch, MagicMock
from typing import NamedTuple, Dict
from typing import Dict, NamedTuple
from unittest.mock import MagicMock, patch

from flytekit import task, workflow
from flytekitplugins.soda import SodaTask

from flytekit import task, workflow

# Define a NamedTuple to represent the expected output from the SodaTask
SodaTaskOutput = NamedTuple("SodaTaskOutput", [("scan_result", Dict[str, any])])

Expand All @@ -13,7 +14,10 @@
MOCK_DATA_SOURCE = "mock_data_source"
MOCK_SCAN_NAME = "mock_scan_name"
MOCK_API_KEY = "mock_api_key"
MOCK_RESPONSE = {"scan_result": {"status": "success", "findings": []}} # Example response structure
MOCK_RESPONSE = {
"scan_result": {"status": "success", "findings": []}
} # Example response structure


# Define a Flyte task to initialize the SodaTask and execute it
@task
Expand All @@ -23,17 +27,19 @@ def setup_soda_task() -> SodaTaskOutput:
scan_definition=MOCK_SCAN_DEFINITION,
data_source=MOCK_DATA_SOURCE,
scan_name=MOCK_SCAN_NAME,
soda_cloud_api_key=MOCK_API_KEY
soda_cloud_api_key=MOCK_API_KEY,
)

# Execute the task and return the mock response
return soda_task.execute()


# Define a Flyte workflow to test the setup task
@workflow
def test_soda_workflow() -> SodaTaskOutput:
return setup_soda_task()


# Define the test class for the SodaTask plugin
class TestSodaTask(unittest.TestCase):
@patch("requests.post")
Expand All @@ -55,9 +61,10 @@ def test_soda_task_execution(self, mock_post):
"scan_definition": MOCK_SCAN_DEFINITION,
"data_source": MOCK_DATA_SOURCE,
"scan_name": MOCK_SCAN_NAME,
"api_key": MOCK_API_KEY
}
"api_key": MOCK_API_KEY,
},
)


if __name__ == "__main__":
unittest.main()
unittest.main()

0 comments on commit f204dc6

Please sign in to comment.