diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py new file mode 100644 index 000000000..b147c47d6 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -0,0 +1,174 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import timedelta + +import pandas as pd +from pyspark.sql import functions as F +from pyspark.sql import SparkSession +from pyspark.sql import DataFrame + +from ...._pipeline_utils.models import Libraries, SystemType +from ...interfaces import WranglerBaseInterface + + +class IntervalFiltering(WranglerBaseInterface): + """ + Cleanses a DataFrame by removing rows outside a specified interval window. + Example: + + + Parameters: + spark (SparkSession): A SparkSession object. + df (DataFrame): PySpark DataFrame to be converted + interval (int): The interval length for cleansing. + interval_unit (str): 'hours', 'minutes', 'seconds' or 'milliseconds' to specify the unit of the interval. + """ + + """ Default time stamp column name if not set in the constructor """ + DEFAULT_TIME_STAMP_COLUMN_NAME: str = "EventTime" + + def __init__( + self, + spark: SparkSession, + df: DataFrame, + interval: int, + interval_unit: str, + time_stamp_column_name: str = None, + tolerance: int = None, + ) -> None: + self.spark = spark + self.df = df + self.interval = interval + self.interval_unit = interval_unit + self.tolerance = tolerance + if time_stamp_column_name is None: + self.time_stamp_column_name = self.DEFAULT_TIME_STAMP_COLUMN_NAME + else: + self.time_stamp_column_name = time_stamp_column_name + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def convert_column_to_timestamp(self) -> DataFrame: + try: + return self.df.withColumn( + self.time_stamp_column_name, F.to_timestamp(self.time_stamp_column_name) + ) + except Exception as e: + raise ValueError( + f"Error converting column {self.time_stamp_column_name} to timestamp: {e}" + ) + + def get_time_delta(self, value: int) -> timedelta: + if self.interval_unit == "minutes": + return timedelta(minutes=value) + elif self.interval_unit == "days": + return timedelta(days=value) + elif self.interval_unit == "hours": + return timedelta(hours=value) + elif self.interval_unit == "seconds": + return timedelta(seconds=value) + elif self.interval_unit == "milliseconds": + return timedelta(milliseconds=value) + else: + raise ValueError( + "interval_unit must be either 'days', 'hours', 'minutes', 'seconds' or 'milliseconds'" + ) + + def check_if_outside_of_interval( + self, + current_time_stamp: pd.Timestamp, + last_time_stamp: pd.Timestamp, + time_delta_in_ms: float, + tolerance_in_ms: float, + ) -> bool: + if tolerance_in_ms is None: + return ( + (current_time_stamp - last_time_stamp).total_seconds() * 1000 + ) >= time_delta_in_ms + else: + return ( + (current_time_stamp - last_time_stamp).total_seconds() * 1000 + ) + tolerance_in_ms >= time_delta_in_ms + + def format_date_time_to_string(self, time_stamp: pd.Timestamp) -> str: + try: + return time_stamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + except Exception as e: + raise ValueError(f"Error converting timestamp to string: {e}") + + def filter(self) -> DataFrame: + """ + Filters the DataFrame based on the interval + """ + + if self.time_stamp_column_name not in self.df.columns: + raise ValueError( + f"Column {self.time_stamp_column_name} not found in the DataFrame." + ) + + original_schema = self.df.schema + self.df = self.convert_column_to_timestamp().orderBy( + self.time_stamp_column_name + ) + + tolerance_in_ms = None + if self.tolerance is not None: + tolerance_in_ms = self.get_time_delta(self.tolerance).total_seconds() * 1000 + print(tolerance_in_ms) + + time_delta_in_ms = self.get_time_delta(self.interval).total_seconds() * 1000 + + rows = self.df.collect() + last_time_stamp = rows[0][self.time_stamp_column_name] + first_row = rows[0].asDict() + first_row[self.time_stamp_column_name] = self.format_date_time_to_string( + first_row[self.time_stamp_column_name] + ) + + cleansed_df = [first_row] + + for i in range(1, len(rows)): + current_row = rows[i] + current_time_stamp = current_row[self.time_stamp_column_name] + + if self.check_if_outside_of_interval( + current_time_stamp, last_time_stamp, time_delta_in_ms, tolerance_in_ms + ): + current_row_dict = current_row.asDict() + current_row_dict[self.time_stamp_column_name] = ( + self.format_date_time_to_string( + current_row_dict[self.time_stamp_column_name] + ) + ) + cleansed_df.append(current_row_dict) + last_time_stamp = current_time_stamp + + result_df = self.spark.createDataFrame(cleansed_df, schema=original_schema) + + return result_df diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py new file mode 100644 index 000000000..7ad4944d9 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -0,0 +1,304 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from pyspark.sql import SparkSession +from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import ( + IntervalFiltering, +) + + +@pytest.fixture(scope="session") +def spark_session(): + return SparkSession.builder.master("local[2]").appName("test").getOrCreate() + + +def test_interval_detection_easy(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "seconds", "EventTime" + ) + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + + +def test_interval_detection_easy_unordered(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "seconds", "EventTime" + ) + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + + +def test_interval_detection_milliseconds(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.020"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.030"), + ], + ["TagName", "Time"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.020"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.025"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 20:03:46.035"), + ], + ["TagName", "Time"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 10, "milliseconds", "Time" + ) + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + + +def test_interval_detection_minutes(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:06:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:12:46.030"), + ], + ["TagName", "Time"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 20:03:46.035"), + ], + ["TagName", "Time"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 3, "minutes", "Time" + ) + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + + +def test_interval_detection_hours(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours") + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + + +def test_interval_detection_days(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-03 21:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-04 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2028-01-01 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-03 21:03:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 21:03:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-04 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2028-01-01 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "days") + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + + +def test_interval_detection_wrong_time_stamp_column_name(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "hours", "Time" + ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + + +def test_interval_detection_wrong_interval_unit_pass(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "years", "EventTime" + ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + + +def test_interval_detection_faulty_time_stamp(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-09-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "minutes", "EventTime" + ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + + +def test_interval_tolerance(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:47.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:50.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:52.000", "Good", "0.129999995"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:46.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:47.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:50.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:51.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:52.000", "Good", "0.129999995"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 3, "seconds", "EventTime", 1 + ) + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect()