Skip to content

Commit

Permalink
#22: Expanded test cases to cover all units and multiple intervals
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Hoffmann <[email protected]>
  • Loading branch information
dh1542 committed Nov 9, 2024
1 parent 1f3218e commit 396b49f
Showing 1 changed file with 122 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from hyperframe.frame import DataFrame

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

from pyspark.sql import DataFrame as PySparkDataFrame
from rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import IntervalFiltering


Expand All @@ -25,36 +25,146 @@ def spark_session():
return SparkSession.builder.master("local[2]").appName("test").getOrCreate()


def test_interval_detection(spark_session: SparkSession):
def test_interval_detection_easy(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
],
["TagName", "EventTime", "Status", "Value"],
)

df = spark_session.createDataFrame(
[
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
],
["TagName", "EventTime", "Status", "Value"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime")
actual_df = interval_filtering_wrangler.filter()

assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

def test_interval_detection_easy_unordered(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
],
["TagName", "EventTime", "Status", "Value"],
)

df = spark_session.createDataFrame(
[
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"),
("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"),
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"),
],
["TagName", "EventTime", "Status", "Value"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime")
actual_df = interval_filtering_wrangler.filter()

assert isinstance(actual_df, DataFrame)
assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

def test_interval_detection_milliseconds(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.020"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.030"),
],
["TagName", "Time"],
)

df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.020"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.025"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:03:46.030"),
("A2PS64V0J.:ZUasdX09R", "2024-01-02 20:03:46.035"),
],
["TagName", "Time"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 10, "milliseconds", "Time")
actual_df = interval_filtering_wrangler.filter()

assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

def test_interval_detection_minutes(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:06:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:12:46.030"),

],
["TagName", "Time"],
)

df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:06:46.000"),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:09:45.999"),
("A2PS64asd.:ZUX09R", "2024-01-02 20:12:46.030"),
("A2PS64V0J.:ZUasdX09R", "2024-01-02 20:03:46.035"),
],
["TagName", "Time"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 3, "minutes", "Time")
actual_df = interval_filtering_wrangler.filter()

assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

def test_interval_detection_hours(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"),
("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"),
],
["TagName", "EventTime"],
)

df = spark_session.createDataFrame(
[
("A2PS64V0JR", "2024-01-02 20:03:46.000"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:06:46.000"),
("A2PS64V0J.:ZUX09R", "2024-01-02 21:09:45.999"),
("A2PS64asd.:ZUX09R", "2024-01-02 21:12:46.030"),
("A2PS64V0J.:ZUasdX09R", "2024-01-02 23:03:46.035"),
],
["TagName", "EventTime"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours" )
actual_df = interval_filtering_wrangler.filter()

assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
Expand Down

0 comments on commit 396b49f

Please sign in to comment.