Skip to content

Commit

Permalink
#22: Added test cases for thrown exceptions
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Hoffmann <[email protected]>
  • Loading branch information
dh1542 committed Nov 9, 2024
1 parent c034b31 commit c4752c7
Showing 1 changed file with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from hyperframe.frame import DataFrame

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame as PySparkDataFrame
Expand Down Expand Up @@ -169,3 +168,55 @@ def test_interval_detection_hours(spark_session: SparkSession):
assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

def test_interval_detection_wrong_time_stamp_column_name(spark_session: SparkSession):
df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"),
("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"),
("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"),
],
["TagName", "EventTime"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours", "Time" )

with pytest.raises(ValueError):
interval_filtering_wrangler.filter()

def test_interval_detection_wrong_interval_unit_pass(spark_session: SparkSession):
df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"),
("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"),
("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"),
],
["TagName", "EventTime"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "years", "EventTime" )

with pytest.raises(ValueError):
interval_filtering_wrangler.filter()

def test_interval_detection_faulty_time_stamp(spark_session: SparkSession):
df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-09-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"),
("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"),
("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"),
],
["TagName", "EventTime"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "minutes", "EventTime" )

with pytest.raises(ValueError):
interval_filtering_wrangler.filter()

0 comments on commit c4752c7

Please sign in to comment.