From 366360dfcf69ced3a40700a7df89ee8f1fb5ab87 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Mon, 4 Nov 2024 16:37:21 +0100 Subject: [PATCH 01/13] #22: Initial commit Signed-off-by: Dominik Hoffmann --- .../data_wranglers/spark/data_quality/intervall_detection.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py new file mode 100644 index 000000000..e69de29bb From 11c288b151fb7c5c2436807093df27ea3dada88b Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Tue, 5 Nov 2024 08:40:30 +0100 Subject: [PATCH 02/13] #22: Added boilerplate and test class Signed-off-by: Dominik Hoffmann --- .../spark/data_quality/intervall_detection.py | 57 +++++++++++++++++++ .../data_quality/test_intervall_detection.py | 0 2 files changed, 57 insertions(+) create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_intervall_detection.py diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py index e69de29bb..9c59450fb 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py @@ -0,0 +1,57 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pandas import DataFrame + +from rtdip_sdk.pipelines._pipeline_utils.models import SystemType, Libraries +from ...interfaces import WranglerBaseInterface + +class IntervallDetection(WranglerBaseInterface): + """ + The Intervall Detection cleanses a PySpark DataFrame from entries + + + + Parameters: + spark (SparkSession): A SparkSession object. + df (DataFrame): Dataframe containing the raw data. + column_names (list[str]): The names of the columns to be filtered (currently only one column is supported). + k_value (float): The number of deviations to build the threshold. + use_median (book): If True the median and the median absolute deviation (MAD) are used, instead of the mean and standard deviation. + """ + df: DataFrame + + + def __init__(self, df: DataFrame) -> None: + self.df = df + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + + + diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_intervall_detection.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_intervall_detection.py new file mode 100644 index 000000000..e69de29bb From 8bff78cab9af850eb9b11d059d076e10f18a0a79 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Tue, 5 Nov 2024 08:51:14 +0100 Subject: [PATCH 03/13] #22: Updated description and added parameter Signed-off-by: Dominik Hoffmann --- .../spark/data_quality/intervall_detection.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py index 9c59450fb..69651e7b2 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py @@ -11,29 +11,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from pandas import DataFrame - -from rtdip_sdk.pipelines._pipeline_utils.models import SystemType, Libraries +from pyspark.sql import DataFrame, SparkSession +from ...._pipeline_utils.models import Libraries, SystemType from ...interfaces import WranglerBaseInterface class IntervallDetection(WranglerBaseInterface): """ - The Intervall Detection cleanses a PySpark DataFrame from entries + The Intervall Detection cleanses a PySpark DataFrame from entries specified by a passed interval. Parameters: spark (SparkSession): A SparkSession object. - df (DataFrame): Dataframe containing the raw data. - column_names (list[str]): The names of the columns to be filtered (currently only one column is supported). - k_value (float): The number of deviations to build the threshold. - use_median (book): If True the median and the median absolute deviation (MAD) are used, instead of the mean and standard deviation. + df (DataFrame): PySpark DataFrame to be converted + interval (float): The interval to be used for the detection in seconds + """ - df: DataFrame - def __init__(self, df: DataFrame) -> None: + def __init__(self, spark: SparkSession, df: DataFrame, interval: float) -> None: + self.spark = spark self.df = df + self.interval = interval @staticmethod def system_type(): @@ -52,6 +51,12 @@ def libraries(): def settings() -> dict: return {} + def filter(self) -> DataFrame: + """ + Filters the DataFrame based on the interval + """ + return self.df.filter(f"timestamp % {self.interval} == 0") + From 4f2a646384fb40e1851c01c6fcc89ff665dc5ef7 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Tue, 5 Nov 2024 15:20:00 +0100 Subject: [PATCH 04/13] #22: implemented first steps of interval filtering Signed-off-by: Dominik Hoffmann --- .../spark/data_quality/interval_filtering.py | 127 ++++++++++++++++++ .../spark/data_quality/intervall_detection.py | 62 --------- .../data_quality/test_interval_filtering.py | 63 +++++++++ .../data_quality/test_intervall_detection.py | 0 4 files changed, 190 insertions(+), 62 deletions(-) create mode 100644 src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py delete mode 100644 src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py delete mode 100644 tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_intervall_detection.py diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py new file mode 100644 index 000000000..0cc66f433 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -0,0 +1,127 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import timedelta + +import pandas as pd +from pyspark.sql import functions as F +from pyspark.sql import DataFrame, SparkSession + +from ...._pipeline_utils.models import Libraries, SystemType +from ...interfaces import WranglerBaseInterface + +class IntervalFiltering(WranglerBaseInterface): + """ + Cleanses a DataFrame by removing rows outside a specified interval window. + Example: + + + Parameters: + spark (SparkSession): A SparkSession object. + df (DataFrame): PySpark DataFrame to be converted + interval (int): The interval length for cleansing. + interval_unit (str): 'hours', 'minutes', 'seconds' or 'milliseconds' to specify the unit of the interval. + """ + + """ Default time stamp column name if not set in the constructor """ + DEFAULT_TIME_STAMP_COLUMN_NAME: str = "EventTime" + + def __init__(self, spark: SparkSession, df: DataFrame, interval: int, interval_unit: str, time_stamp_column_name: str = None) -> None: + self.spark = spark + self.df = df + self.interval = interval + self.interval_unit = interval_unit + if time_stamp_column_name is None: + self.time_stamp_column_name = self.DEFAULT_TIME_STAMP_COLUMN_NAME + else: self.time_stamp_column_name = time_stamp_column_name + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + + def convert_column_to_timestamp(self) -> DataFrame: + try: + return self.df.withColumn(self.time_stamp_column_name, F.to_timestamp(self.time_stamp_column_name)) + except Exception as e: + raise ValueError(f"Error converting column {self.time_stamp_column_name} to timestamp: {e}") + + def get_time_delta(self) -> timedelta: + if self.interval_unit == 'minutes': + return timedelta(minutes = self.interval) + elif self.interval_unit == 'days': + return timedelta(days = self.interval) + elif self.interval_unit == 'hours': + return timedelta(hours = self.interval) + elif self.interval_unit == 'seconds': + return timedelta(seconds = self.interval) + elif self.interval_unit == 'milliseconds': + return timedelta(milliseconds = self.interval) + else: + raise ValueError("interval_unit must be either 'seconds' or 'milliseconds'") + + def filter(self) -> DataFrame: + """ + Filters the DataFrame based on the interval + """ + + if self.time_stamp_column_name not in self.df.columns: + raise ValueError(f"Column {self.time_stamp_column_name} not found in the DataFrame.") + + self.df = self.convert_column_to_timestamp() + + time_delta = self.get_time_delta() + + rows = self.df.collect() + cleansed_df = [rows[0]] + + + + last_time_stamp = rows[0][self.time_stamp_column_name] + + + for i in range(1, len(rows)): + current_row = rows[i] + current_time_stamp = current_row[self.time_stamp_column_name] + + if ((last_time_stamp - last_time_stamp).total_seconds()) >= time_delta.total_seconds(): + print(current_row) + cleansed_df.append(current_row) + last_time_stamp = current_time_stamp + + print(cleansed_df) + + + return self.df + + + + + + + + + diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py deleted file mode 100644 index 69651e7b2..000000000 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/intervall_detection.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright 2022 RTDIP -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from pyspark.sql import DataFrame, SparkSession -from ...._pipeline_utils.models import Libraries, SystemType -from ...interfaces import WranglerBaseInterface - -class IntervallDetection(WranglerBaseInterface): - """ - The Intervall Detection cleanses a PySpark DataFrame from entries specified by a passed interval. - - - - Parameters: - spark (SparkSession): A SparkSession object. - df (DataFrame): PySpark DataFrame to be converted - interval (float): The interval to be used for the detection in seconds - - """ - - - def __init__(self, spark: SparkSession, df: DataFrame, interval: float) -> None: - self.spark = spark - self.df = df - self.interval = interval - - @staticmethod - def system_type(): - """ - Attributes: - SystemType (Environment): Requires PYSPARK - """ - return SystemType.PYSPARK - - @staticmethod - def libraries(): - libraries = Libraries() - return libraries - - @staticmethod - def settings() -> dict: - return {} - - def filter(self) -> DataFrame: - """ - Filters the DataFrame based on the interval - """ - return self.df.filter(f"timestamp % {self.interval} == 0") - - - - diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py new file mode 100644 index 000000000..8dcf0ca55 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -0,0 +1,63 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from pyspark.sql import SparkSession +from pyspark.sql.dataframe import DataFrame + +from rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import IntervalFiltering +from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.duplicate_detection import ( + DuplicateDetection, +) + + +@pytest.fixture(scope="session") +def spark_session(): + return SparkSession.builder.master("local[2]").appName("test").getOrCreate() + + +def test_interval_detection(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "milliseconds") + actual_df = interval_filtering_wrangler.filter() + + assert isinstance(actual_df, DataFrame) + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_intervall_detection.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_intervall_detection.py deleted file mode 100644 index e69de29bb..000000000 From 3a41d104a144c71286216f93f33291019b43a328 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Wed, 6 Nov 2024 16:32:00 +0100 Subject: [PATCH 05/13] #22: Returned dataframe. Need to pass also timestamp format. Signed-off-by: Dominik Hoffmann --- .../spark/data_quality/interval_filtering.py | 18 ++++++++++++++---- .../data_quality/test_interval_filtering.py | 8 +++----- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py index 0cc66f433..767aec9e8 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -106,16 +106,26 @@ def filter(self) -> DataFrame: for i in range(1, len(rows)): current_row = rows[i] current_time_stamp = current_row[self.time_stamp_column_name] + if ((last_time_stamp - current_time_stamp).total_seconds()) >= time_delta.total_seconds(): + - if ((last_time_stamp - last_time_stamp).total_seconds()) >= time_delta.total_seconds(): - print(current_row) cleansed_df.append(current_row) last_time_stamp = current_time_stamp - print(cleansed_df) + + # Create Dataframe from cleansed data + result_df = pd.DataFrame(cleansed_df) + + # rename the columns back to original + column_names = self.df.columns + result_df.columns = column_names + + # Convert Dataframe time_stamp column back to string + result_df[self.time_stamp_column_name] = result_df[self.time_stamp_column_name].dt.strftime('%Y-%m-%d %H:%M:%S.%f') - return self.df + print(result_df) + return result_df diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py index 8dcf0ca55..cc2bbf932 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -14,12 +14,10 @@ import pytest from pyspark.sql import SparkSession -from pyspark.sql.dataframe import DataFrame +from pyspark.sql import DataFrame from rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import IntervalFiltering -from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.duplicate_detection import ( - DuplicateDetection, -) + @pytest.fixture(scope="session") @@ -53,7 +51,7 @@ def test_interval_detection(spark_session: SparkSession): ["TagName", "EventTime", "Status", "Value"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "milliseconds") + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime") actual_df = interval_filtering_wrangler.filter() assert isinstance(actual_df, DataFrame) From 396b49f5de9b59bca1340990d260f607575a64f4 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 15:48:38 +0100 Subject: [PATCH 06/13] #22: Expanded test cases to cover all units and multiple intervals Signed-off-by: Dominik Hoffmann --- .../data_quality/test_interval_filtering.py | 134 ++++++++++++++++-- 1 file changed, 122 insertions(+), 12 deletions(-) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py index cc2bbf932..6644e7dfa 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest +from hyperframe.frame import DataFrame from pyspark.sql import SparkSession -from pyspark.sql import DataFrame - +from pyspark.sql import DataFrame as PySparkDataFrame from rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import IntervalFiltering @@ -25,28 +25,55 @@ def spark_session(): return SparkSession.builder.master("local[2]").appName("test").getOrCreate() -def test_interval_detection(spark_session: SparkSession): +def test_interval_detection_easy(spark_session: SparkSession): expected_df = spark_session.createDataFrame( [ - ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), ], ["TagName", "EventTime", "Status", "Value"], ) df = spark_session.createDataFrame( [ - ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime") + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + +def test_interval_detection_easy_unordered(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + df = spark_session.createDataFrame( + [ ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), ], ["TagName", "EventTime", "Status", "Value"], ) @@ -54,7 +81,90 @@ def test_interval_detection(spark_session: SparkSession): interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime") actual_df = interval_filtering_wrangler.filter() - assert isinstance(actual_df, DataFrame) + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + +def test_interval_detection_milliseconds(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.020"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.030"), + ], + ["TagName", "Time"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.020"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.025"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 20:03:46.035"), + ], + ["TagName", "Time"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 10, "milliseconds", "Time") + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + +def test_interval_detection_minutes(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:06:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:12:46.030"), + + ], + ["TagName", "Time"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 20:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 20:03:46.035"), + ], + ["TagName", "Time"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 3, "minutes", "Time") + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + +def test_interval_detection_hours(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours" ) + actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema From 9ec9b45595cdf07c2a6031123244f794c320b1e5 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 15:49:19 +0100 Subject: [PATCH 07/13] #22: Fixed logic and unit mismatches Signed-off-by: Dominik Hoffmann --- .../spark/data_quality/interval_filtering.py | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py index 767aec9e8..a480a4753 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -14,8 +14,10 @@ from datetime import timedelta import pandas as pd +from databricks.sqlalchemy.test_local.conftest import schema from pyspark.sql import functions as F -from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import SparkSession +from pyspark.sql import DataFrame from ...._pipeline_utils.models import Libraries, SystemType from ...interfaces import WranglerBaseInterface @@ -36,6 +38,7 @@ class IntervalFiltering(WranglerBaseInterface): """ Default time stamp column name if not set in the constructor """ DEFAULT_TIME_STAMP_COLUMN_NAME: str = "EventTime" + def __init__(self, spark: SparkSession, df: DataFrame, interval: int, interval_unit: str, time_stamp_column_name: str = None) -> None: self.spark = spark self.df = df @@ -83,6 +86,9 @@ def get_time_delta(self) -> timedelta: else: raise ValueError("interval_unit must be either 'seconds' or 'milliseconds'") + def format_date_time_to_string(self, time_stamp: pd.Timestamp) -> str: + return time_stamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + def filter(self) -> DataFrame: """ Filters the DataFrame based on the interval @@ -91,40 +97,30 @@ def filter(self) -> DataFrame: if self.time_stamp_column_name not in self.df.columns: raise ValueError(f"Column {self.time_stamp_column_name} not found in the DataFrame.") - self.df = self.convert_column_to_timestamp() + original_schema = self.df.schema + self.df = self.convert_column_to_timestamp().orderBy(self.time_stamp_column_name) - time_delta = self.get_time_delta() + time_delta_in_ms = self.get_time_delta().total_seconds() * 1000 rows = self.df.collect() - cleansed_df = [rows[0]] - - - last_time_stamp = rows[0][self.time_stamp_column_name] + first_row = rows[0].asDict() + first_row[self.time_stamp_column_name] = self.format_date_time_to_string(first_row[self.time_stamp_column_name]) + cleansed_df = [first_row] for i in range(1, len(rows)): current_row = rows[i] current_time_stamp = current_row[self.time_stamp_column_name] - if ((last_time_stamp - current_time_stamp).total_seconds()) >= time_delta.total_seconds(): - - cleansed_df.append(current_row) + if ((current_time_stamp - last_time_stamp).total_seconds() * 1000) >= time_delta_in_ms: + current_row_dict = current_row.asDict() + current_row_dict[self.time_stamp_column_name] = self.format_date_time_to_string(current_row_dict[self.time_stamp_column_name]) + cleansed_df.append(current_row_dict) last_time_stamp = current_time_stamp + result_df = self.spark.createDataFrame(cleansed_df, schema= original_schema) - # Create Dataframe from cleansed data - result_df = pd.DataFrame(cleansed_df) - - # rename the columns back to original - column_names = self.df.columns - result_df.columns = column_names - - # Convert Dataframe time_stamp column back to string - result_df[self.time_stamp_column_name] = result_df[self.time_stamp_column_name].dt.strftime('%Y-%m-%d %H:%M:%S.%f') - - - print(result_df) return result_df From c034b3184a05d1dd9bd4d8d6f2b2e9336ce94941 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 16:01:49 +0100 Subject: [PATCH 08/13] #22: Fixed uncaught exception Signed-off-by: Dominik Hoffmann --- .../data_wranglers/spark/data_quality/interval_filtering.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py index a480a4753..18caf3eb1 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -15,6 +15,7 @@ import pandas as pd from databricks.sqlalchemy.test_local.conftest import schema +from exceptiongroup import catch from pyspark.sql import functions as F from pyspark.sql import SparkSession from pyspark.sql import DataFrame @@ -87,7 +88,10 @@ def get_time_delta(self) -> timedelta: raise ValueError("interval_unit must be either 'seconds' or 'milliseconds'") def format_date_time_to_string(self, time_stamp: pd.Timestamp) -> str: - return time_stamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + try: + return time_stamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + except Exception as e: + raise ValueError(f"Error converting timestamp to string: {e}") def filter(self) -> DataFrame: """ From c4752c71c5d3b96d34e47a77639f3483a67c844e Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 16:02:10 +0100 Subject: [PATCH 09/13] #22: Added test cases for thrown exceptions Signed-off-by: Dominik Hoffmann --- .../data_quality/test_interval_filtering.py | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py index 6644e7dfa..4e5ad4d09 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest -from hyperframe.frame import DataFrame from pyspark.sql import SparkSession from pyspark.sql import DataFrame as PySparkDataFrame @@ -169,3 +168,55 @@ def test_interval_detection_hours(spark_session: SparkSession): assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + +def test_interval_detection_wrong_time_stamp_column_name(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours", "Time" ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + +def test_interval_detection_wrong_interval_unit_pass(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "years", "EventTime" ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + +def test_interval_detection_faulty_time_stamp(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-09-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "minutes", "EventTime" ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + From 0bcd73efbdc455a7b57b203d1d668cf9bad2c62c Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 16:03:35 +0100 Subject: [PATCH 10/13] #22: Fixed exception print Signed-off-by: Dominik Hoffmann --- .../data_wranglers/spark/data_quality/interval_filtering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py index 18caf3eb1..a7a768064 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -85,7 +85,7 @@ def get_time_delta(self) -> timedelta: elif self.interval_unit == 'milliseconds': return timedelta(milliseconds = self.interval) else: - raise ValueError("interval_unit must be either 'seconds' or 'milliseconds'") + raise ValueError("interval_unit must be either 'days', 'hours', 'minutes', 'seconds' or 'milliseconds'") def format_date_time_to_string(self, time_stamp: pd.Timestamp) -> str: try: From f02cfb5db9122c44c4dc0a4a4234883e11492d69 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 16:08:43 +0100 Subject: [PATCH 11/13] #22: Added years test case Signed-off-by: Dominik Hoffmann --- .../data_quality/test_interval_filtering.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py index 4e5ad4d09..1379d7929 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -169,6 +169,35 @@ def test_interval_detection_hours(spark_session: SparkSession): assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() +def test_interval_detection_days(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-03 21:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-04 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2028-01-01 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-03 21:03:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 21:03:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-04 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2028-01-01 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "days" ) + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + def test_interval_detection_wrong_time_stamp_column_name(spark_session: SparkSession): df = spark_session.createDataFrame( [ From 5375d50f4fe866d049c37f59c46ab842e1ec1588 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 16:24:18 +0100 Subject: [PATCH 12/13] #22: Implemented tolerance Signed-off-by: Dominik Hoffmann --- .../spark/data_quality/interval_filtering.py | 31 +++++++++++++------ .../data_quality/test_interval_filtering.py | 31 +++++++++++++++++++ 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py index a7a768064..e0b353772 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -40,11 +40,12 @@ class IntervalFiltering(WranglerBaseInterface): DEFAULT_TIME_STAMP_COLUMN_NAME: str = "EventTime" - def __init__(self, spark: SparkSession, df: DataFrame, interval: int, interval_unit: str, time_stamp_column_name: str = None) -> None: + def __init__(self, spark: SparkSession, df: DataFrame, interval: int, interval_unit: str, time_stamp_column_name: str = None, tolerance: int = None) -> None: self.spark = spark self.df = df self.interval = interval self.interval_unit = interval_unit + self.tolerance = tolerance if time_stamp_column_name is None: self.time_stamp_column_name = self.DEFAULT_TIME_STAMP_COLUMN_NAME else: self.time_stamp_column_name = time_stamp_column_name @@ -73,20 +74,26 @@ def convert_column_to_timestamp(self) -> DataFrame: except Exception as e: raise ValueError(f"Error converting column {self.time_stamp_column_name} to timestamp: {e}") - def get_time_delta(self) -> timedelta: + def get_time_delta(self, value: int) -> timedelta: if self.interval_unit == 'minutes': - return timedelta(minutes = self.interval) + return timedelta(minutes = value) elif self.interval_unit == 'days': - return timedelta(days = self.interval) + return timedelta(days = value) elif self.interval_unit == 'hours': - return timedelta(hours = self.interval) + return timedelta(hours = value) elif self.interval_unit == 'seconds': - return timedelta(seconds = self.interval) + return timedelta(seconds = value) elif self.interval_unit == 'milliseconds': - return timedelta(milliseconds = self.interval) + return timedelta(milliseconds = value) else: raise ValueError("interval_unit must be either 'days', 'hours', 'minutes', 'seconds' or 'milliseconds'") + def check_if_outside_of_interval(self, current_time_stamp: pd.Timestamp, last_time_stamp: pd.Timestamp, time_delta_in_ms: float, tolerance_in_ms: float) -> bool: + if tolerance_in_ms is None: + return ((current_time_stamp - last_time_stamp).total_seconds() * 1000) >= time_delta_in_ms + else: + return ((current_time_stamp - last_time_stamp).total_seconds() * 1000) + tolerance_in_ms >= time_delta_in_ms + def format_date_time_to_string(self, time_stamp: pd.Timestamp) -> str: try: return time_stamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] @@ -104,7 +111,13 @@ def filter(self) -> DataFrame: original_schema = self.df.schema self.df = self.convert_column_to_timestamp().orderBy(self.time_stamp_column_name) - time_delta_in_ms = self.get_time_delta().total_seconds() * 1000 + tolerance_in_ms = None + if self.tolerance is not None: + tolerance_in_ms = self.get_time_delta(self.tolerance).total_seconds() * 1000 + print(tolerance_in_ms) + + + time_delta_in_ms = self.get_time_delta(self.interval).total_seconds() * 1000 rows = self.df.collect() last_time_stamp = rows[0][self.time_stamp_column_name] @@ -117,7 +130,7 @@ def filter(self) -> DataFrame: current_row = rows[i] current_time_stamp = current_row[self.time_stamp_column_name] - if ((current_time_stamp - last_time_stamp).total_seconds() * 1000) >= time_delta_in_ms: + if self.check_if_outside_of_interval(current_time_stamp, last_time_stamp, time_delta_in_ms, tolerance_in_ms): current_row_dict = current_row.asDict() current_row_dict[self.time_stamp_column_name] = self.format_date_time_to_string(current_row_dict[self.time_stamp_column_name]) cleansed_df.append(current_row_dict) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py index 1379d7929..8205dcad5 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -249,3 +249,34 @@ def test_interval_detection_faulty_time_stamp(spark_session: SparkSession): with pytest.raises(ValueError): interval_filtering_wrangler.filter() +def test_interval_tolerance(spark_session: SparkSession): + expected_df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:47.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:50.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:52.000", "Good", "0.129999995"), + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + df = spark_session.createDataFrame( + [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:46.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:47.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:50.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:51.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:52.000", "Good", "0.129999995"), + + ], + ["TagName", "EventTime", "Status", "Value"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 3, "seconds", "EventTime", 1) + actual_df = interval_filtering_wrangler.filter() + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + From 9fd41b88f6f9f1eed021c13378e112ed6e35fa44 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 16:41:35 +0100 Subject: [PATCH 13/13] #22: Linted/Formatted code + fixed import Signed-off-by: Dominik Hoffmann --- .../spark/data_quality/interval_filtering.py | 122 +++++++++++------- .../data_quality/test_interval_filtering.py | 54 +++++--- 2 files changed, 111 insertions(+), 65 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py index e0b353772..b147c47d6 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/interval_filtering.py @@ -14,33 +14,39 @@ from datetime import timedelta import pandas as pd -from databricks.sqlalchemy.test_local.conftest import schema -from exceptiongroup import catch from pyspark.sql import functions as F -from pyspark.sql import SparkSession +from pyspark.sql import SparkSession from pyspark.sql import DataFrame from ...._pipeline_utils.models import Libraries, SystemType from ...interfaces import WranglerBaseInterface + class IntervalFiltering(WranglerBaseInterface): """ - Cleanses a DataFrame by removing rows outside a specified interval window. - Example: + Cleanses a DataFrame by removing rows outside a specified interval window. + Example: - Parameters: - spark (SparkSession): A SparkSession object. - df (DataFrame): PySpark DataFrame to be converted - interval (int): The interval length for cleansing. - interval_unit (str): 'hours', 'minutes', 'seconds' or 'milliseconds' to specify the unit of the interval. - """ + Parameters: + spark (SparkSession): A SparkSession object. + df (DataFrame): PySpark DataFrame to be converted + interval (int): The interval length for cleansing. + interval_unit (str): 'hours', 'minutes', 'seconds' or 'milliseconds' to specify the unit of the interval. + """ """ Default time stamp column name if not set in the constructor """ DEFAULT_TIME_STAMP_COLUMN_NAME: str = "EventTime" - - def __init__(self, spark: SparkSession, df: DataFrame, interval: int, interval_unit: str, time_stamp_column_name: str = None, tolerance: int = None) -> None: + def __init__( + self, + spark: SparkSession, + df: DataFrame, + interval: int, + interval_unit: str, + time_stamp_column_name: str = None, + tolerance: int = None, + ) -> None: self.spark = spark self.df = df self.interval = interval @@ -48,7 +54,8 @@ def __init__(self, spark: SparkSession, df: DataFrame, interval: int, interval_ self.tolerance = tolerance if time_stamp_column_name is None: self.time_stamp_column_name = self.DEFAULT_TIME_STAMP_COLUMN_NAME - else: self.time_stamp_column_name = time_stamp_column_name + else: + self.time_stamp_column_name = time_stamp_column_name @staticmethod def system_type(): @@ -67,36 +74,51 @@ def libraries(): def settings() -> dict: return {} - def convert_column_to_timestamp(self) -> DataFrame: try: - return self.df.withColumn(self.time_stamp_column_name, F.to_timestamp(self.time_stamp_column_name)) + return self.df.withColumn( + self.time_stamp_column_name, F.to_timestamp(self.time_stamp_column_name) + ) except Exception as e: - raise ValueError(f"Error converting column {self.time_stamp_column_name} to timestamp: {e}") + raise ValueError( + f"Error converting column {self.time_stamp_column_name} to timestamp: {e}" + ) def get_time_delta(self, value: int) -> timedelta: - if self.interval_unit == 'minutes': - return timedelta(minutes = value) - elif self.interval_unit == 'days': - return timedelta(days = value) - elif self.interval_unit == 'hours': - return timedelta(hours = value) - elif self.interval_unit == 'seconds': - return timedelta(seconds = value) - elif self.interval_unit == 'milliseconds': - return timedelta(milliseconds = value) + if self.interval_unit == "minutes": + return timedelta(minutes=value) + elif self.interval_unit == "days": + return timedelta(days=value) + elif self.interval_unit == "hours": + return timedelta(hours=value) + elif self.interval_unit == "seconds": + return timedelta(seconds=value) + elif self.interval_unit == "milliseconds": + return timedelta(milliseconds=value) else: - raise ValueError("interval_unit must be either 'days', 'hours', 'minutes', 'seconds' or 'milliseconds'") - - def check_if_outside_of_interval(self, current_time_stamp: pd.Timestamp, last_time_stamp: pd.Timestamp, time_delta_in_ms: float, tolerance_in_ms: float) -> bool: + raise ValueError( + "interval_unit must be either 'days', 'hours', 'minutes', 'seconds' or 'milliseconds'" + ) + + def check_if_outside_of_interval( + self, + current_time_stamp: pd.Timestamp, + last_time_stamp: pd.Timestamp, + time_delta_in_ms: float, + tolerance_in_ms: float, + ) -> bool: if tolerance_in_ms is None: - return ((current_time_stamp - last_time_stamp).total_seconds() * 1000) >= time_delta_in_ms + return ( + (current_time_stamp - last_time_stamp).total_seconds() * 1000 + ) >= time_delta_in_ms else: - return ((current_time_stamp - last_time_stamp).total_seconds() * 1000) + tolerance_in_ms >= time_delta_in_ms + return ( + (current_time_stamp - last_time_stamp).total_seconds() * 1000 + ) + tolerance_in_ms >= time_delta_in_ms def format_date_time_to_string(self, time_stamp: pd.Timestamp) -> str: try: - return time_stamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + return time_stamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] except Exception as e: raise ValueError(f"Error converting timestamp to string: {e}") @@ -106,23 +128,28 @@ def filter(self) -> DataFrame: """ if self.time_stamp_column_name not in self.df.columns: - raise ValueError(f"Column {self.time_stamp_column_name} not found in the DataFrame.") + raise ValueError( + f"Column {self.time_stamp_column_name} not found in the DataFrame." + ) original_schema = self.df.schema - self.df = self.convert_column_to_timestamp().orderBy(self.time_stamp_column_name) + self.df = self.convert_column_to_timestamp().orderBy( + self.time_stamp_column_name + ) tolerance_in_ms = None if self.tolerance is not None: tolerance_in_ms = self.get_time_delta(self.tolerance).total_seconds() * 1000 print(tolerance_in_ms) - time_delta_in_ms = self.get_time_delta(self.interval).total_seconds() * 1000 rows = self.df.collect() last_time_stamp = rows[0][self.time_stamp_column_name] first_row = rows[0].asDict() - first_row[self.time_stamp_column_name] = self.format_date_time_to_string(first_row[self.time_stamp_column_name]) + first_row[self.time_stamp_column_name] = self.format_date_time_to_string( + first_row[self.time_stamp_column_name] + ) cleansed_df = [first_row] @@ -130,21 +157,18 @@ def filter(self) -> DataFrame: current_row = rows[i] current_time_stamp = current_row[self.time_stamp_column_name] - if self.check_if_outside_of_interval(current_time_stamp, last_time_stamp, time_delta_in_ms, tolerance_in_ms): + if self.check_if_outside_of_interval( + current_time_stamp, last_time_stamp, time_delta_in_ms, tolerance_in_ms + ): current_row_dict = current_row.asDict() - current_row_dict[self.time_stamp_column_name] = self.format_date_time_to_string(current_row_dict[self.time_stamp_column_name]) + current_row_dict[self.time_stamp_column_name] = ( + self.format_date_time_to_string( + current_row_dict[self.time_stamp_column_name] + ) + ) cleansed_df.append(current_row_dict) last_time_stamp = current_time_stamp - result_df = self.spark.createDataFrame(cleansed_df, schema= original_schema) + result_df = self.spark.createDataFrame(cleansed_df, schema=original_schema) return result_df - - - - - - - - - diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py index 8205dcad5..7ad4944d9 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -14,9 +14,9 @@ import pytest from pyspark.sql import SparkSession -from pyspark.sql import DataFrame as PySparkDataFrame -from rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import IntervalFiltering - +from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import ( + IntervalFiltering, +) @pytest.fixture(scope="session") @@ -47,13 +47,16 @@ def test_interval_detection_easy(spark_session: SparkSession): ["TagName", "EventTime", "Status", "Value"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime") + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "seconds", "EventTime" + ) actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + def test_interval_detection_easy_unordered(spark_session: SparkSession): expected_df = spark_session.createDataFrame( [ @@ -77,13 +80,16 @@ def test_interval_detection_easy_unordered(spark_session: SparkSession): ["TagName", "EventTime", "Status", "Value"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime") + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "seconds", "EventTime" + ) actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + def test_interval_detection_milliseconds(spark_session: SparkSession): expected_df = spark_session.createDataFrame( [ @@ -105,20 +111,22 @@ def test_interval_detection_milliseconds(spark_session: SparkSession): ["TagName", "Time"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 10, "milliseconds", "Time") + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 10, "milliseconds", "Time" + ) actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + def test_interval_detection_minutes(spark_session: SparkSession): expected_df = spark_session.createDataFrame( [ ("A2PS64V0JR", "2024-01-02 20:03:46.000"), ("A2PS64asd.:ZUX09R", "2024-01-02 20:06:46.000"), ("A2PS64asd.:ZUX09R", "2024-01-02 20:12:46.030"), - ], ["TagName", "Time"], ) @@ -134,13 +142,16 @@ def test_interval_detection_minutes(spark_session: SparkSession): ["TagName", "Time"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 3, "minutes", "Time") + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 3, "minutes", "Time" + ) actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + def test_interval_detection_hours(spark_session: SparkSession): expected_df = spark_session.createDataFrame( [ @@ -162,13 +173,14 @@ def test_interval_detection_hours(spark_session: SparkSession): ["TagName", "EventTime"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours" ) + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours") actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + def test_interval_detection_days(spark_session: SparkSession): expected_df = spark_session.createDataFrame( [ @@ -191,13 +203,14 @@ def test_interval_detection_days(spark_session: SparkSession): ["TagName", "EventTime"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "days" ) + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "days") actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + def test_interval_detection_wrong_time_stamp_column_name(spark_session: SparkSession): df = spark_session.createDataFrame( [ @@ -210,11 +223,14 @@ def test_interval_detection_wrong_time_stamp_column_name(spark_session: SparkSes ["TagName", "EventTime"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours", "Time" ) + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "hours", "Time" + ) with pytest.raises(ValueError): interval_filtering_wrangler.filter() + def test_interval_detection_wrong_interval_unit_pass(spark_session: SparkSession): df = spark_session.createDataFrame( [ @@ -227,11 +243,14 @@ def test_interval_detection_wrong_interval_unit_pass(spark_session: SparkSession ["TagName", "EventTime"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "years", "EventTime" ) + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "years", "EventTime" + ) with pytest.raises(ValueError): interval_filtering_wrangler.filter() + def test_interval_detection_faulty_time_stamp(spark_session: SparkSession): df = spark_session.createDataFrame( [ @@ -244,11 +263,14 @@ def test_interval_detection_faulty_time_stamp(spark_session: SparkSession): ["TagName", "EventTime"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "minutes", "EventTime" ) + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 1, "minutes", "EventTime" + ) with pytest.raises(ValueError): interval_filtering_wrangler.filter() + def test_interval_tolerance(spark_session: SparkSession): expected_df = spark_session.createDataFrame( [ @@ -268,15 +290,15 @@ def test_interval_tolerance(spark_session: SparkSession): ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:50.000", "Good", "0.129999995"), ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:51.000", "Good", "0.129999995"), ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:52.000", "Good", "0.129999995"), - ], ["TagName", "EventTime", "Status", "Value"], ) - interval_filtering_wrangler = IntervalFiltering(spark_session, df, 3, "seconds", "EventTime", 1) + interval_filtering_wrangler = IntervalFiltering( + spark_session, df, 3, "seconds", "EventTime", 1 + ) actual_df = interval_filtering_wrangler.filter() assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() -