From c4752c71c5d3b96d34e47a77639f3483a67c844e Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Sat, 9 Nov 2024 16:02:10 +0100 Subject: [PATCH] #22: Added test cases for thrown exceptions Signed-off-by: Dominik Hoffmann --- .../data_quality/test_interval_filtering.py | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py index 6644e7dfa..4e5ad4d09 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_interval_filtering.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest -from hyperframe.frame import DataFrame from pyspark.sql import SparkSession from pyspark.sql import DataFrame as PySparkDataFrame @@ -169,3 +168,55 @@ def test_interval_detection_hours(spark_session: SparkSession): assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema assert expected_df.collect() == actual_df.collect() + +def test_interval_detection_wrong_time_stamp_column_name(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours", "Time" ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + +def test_interval_detection_wrong_interval_unit_pass(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "years", "EventTime" ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() + +def test_interval_detection_faulty_time_stamp(spark_session: SparkSession): + df = spark_session.createDataFrame( + [ + ("A2PS64V0JR", "2024-01-09-02 20:03:46.000"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"), + ("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"), + ("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"), + ], + ["TagName", "EventTime"], + ) + + interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "minutes", "EventTime" ) + + with pytest.raises(ValueError): + interval_filtering_wrangler.filter() +