diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/interval_filtering.py index 8641b7c14..740264f4d 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/interval_filtering.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/interval_filtering.py @@ -80,8 +80,6 @@ def filter(self) -> DataFrame: self.time_stamp_column_name ) - self.df.show() - tolerance_in_ms = None if self.tolerance is not None: tolerance_in_ms = self.get_time_delta(self.tolerance).total_seconds() * 1000 diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_interval_filtering.py index bf0c8cef5..94a02b1dc 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/test_interval_filtering.py @@ -11,19 +11,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from datetime import datetime import pytest + from pyspark.sql import SparkSession from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.interval_filtering import ( IntervalFiltering, ) +from tests.sdk.python.rtdip_sdk.pipelines.logging.test_log_collection import spark @pytest.fixture(scope="session") def spark_session(): - return SparkSession.builder.master("local[2]").appName("test").getOrCreate() + spark = ( + SparkSession.builder.master("local[2]") + .appName("CheckValueRangesTest") + .getOrCreate() + ) + yield spark + spark.stop() def convert_to_datetime(date_time: str): @@ -336,3 +345,33 @@ def test_interval_detection_date_time_columns(spark_session: SparkSession): assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + + +def test_interval_detection_large_data_set(spark_session: SparkSession): + base_path = os.path.dirname(__file__) + file_path = os.path.join(base_path, "../../test_data.csv") + + df = spark_session.read.option("header", "true").csv(file_path) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours") + + actual_df = interval_filtering_wrangler.filter() + assert actual_df.count() == 25 + + +def test_interval_detection_wrong_datatype(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "invalid_data_type"), + ("A2PS64asd.:ZUX09R", "invalid_data_type"), + ("A2PS64V0J.:ZUX09R", "invalid_data_type"), + ("A2PS64asd.:ZUX09R", "invalid_data_type"), + ("A2PS64V0J.:ZUasdX09R", "invalid_data_type"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours") + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter()