From 25c0301b860c5eebd7801350629e0b27328b4ec1 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 4 Jul 2022 14:37:05 +0200 Subject: [PATCH] test: dataset split functionality tests --- tests/popmon/conftest.py | 2 +- tests/popmon/pipeline/test_split_dataset.py | 112 ++++++++++++ tests/popmon/spark/test_spark.py | 54 +++--- .../popmon/spark/test_split_dataset_spark.py | 161 ++++++++++++++++++ 4 files changed, 302 insertions(+), 27 deletions(-) create mode 100644 tests/popmon/pipeline/test_split_dataset.py create mode 100644 tests/popmon/spark/test_split_dataset_spark.py diff --git a/tests/popmon/conftest.py b/tests/popmon/conftest.py index 6f020a27..619a9b38 100644 --- a/tests/popmon/conftest.py +++ b/tests/popmon/conftest.py @@ -62,7 +62,7 @@ def pytest_configure(): pytest.date = load(f) with open("{}/{}".format(TEMPLATE_PATH, "eyesColor.json")) as f: - pytest.eyesColor = load(f) + pytest.eyeColor = load(f) with open("{}/{}".format(TEMPLATE_PATH, "gender.json")) as f: pytest.gender = load(f) diff --git a/tests/popmon/pipeline/test_split_dataset.py b/tests/popmon/pipeline/test_split_dataset.py new file mode 100644 index 00000000..5d95d34e --- /dev/null +++ b/tests/popmon/pipeline/test_split_dataset.py @@ -0,0 +1,112 @@ +from datetime import datetime, timedelta + +import pandas as pd +import pytest + +from popmon.pipeline.dataset_splitter import split_dataset + + +@pytest.fixture +def test_dataframe_pandas(): + n_samples = 1000 + start = datetime.today() + return pd.DataFrame( + { + "date": [start + timedelta(days=delta) for delta in range(n_samples)], + "f1": [1] * n_samples, + "f2": [0] * n_samples, + } + ) + + +def test_split_dataset_pandas_int(test_dataframe_pandas): + reference, df = split_dataset(test_dataframe_pandas, split=3, time_axis="date") + + assert reference.shape[0] == 3 + assert df.shape[0] == 997 + assert reference.columns.values.tolist() == ["date", "f1", "f2"] + assert df.columns.values.tolist() == ["date", "f1", "f2"] + + +def test_split_dataset_pandas_int_underflow(test_dataframe_pandas): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_pandas, split=0, time_axis="date") + + assert e.value.args[0] == "Number of instances should be greater than 0" + + +def test_split_dataset_pandas_int_overflow(test_dataframe_pandas): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_pandas, split=1001, time_axis="date") + + assert ( + e.value.args[0] + == "Returned dataframe is empty. Please adjust the `split` argument" + ) + + +def test_split_dataset_pandas_float(test_dataframe_pandas): + reference, df = split_dataset(test_dataframe_pandas, split=0.45, time_axis="date") + + assert reference.shape[0] == 450 + assert df.shape[0] == 550 + assert reference.columns.values.tolist() == ["date", "f1", "f2"] + assert df.columns.values.tolist() == ["date", "f1", "f2"] + + +def test_split_dataset_pandas_float_round(test_dataframe_pandas): + reference, df = split_dataset(test_dataframe_pandas, split=0.8888, time_axis="date") + + assert reference.shape[0] == 888 + assert df.shape[0] == 112 + assert reference.columns.values.tolist() == ["date", "f1", "f2"] + assert df.columns.values.tolist() == ["date", "f1", "f2"] + + +def test_split_dataset_pandas_float_underflow(test_dataframe_pandas): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_pandas, split=0.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_pandas, split=-1.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + +def test_split_dataset_pandas_float_overflow(test_dataframe_pandas): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_pandas, split=1.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_pandas, split=10.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + +def test_split_dataset_pandas_condition(test_dataframe_pandas): + reference, df = split_dataset( + test_dataframe_pandas, + split=test_dataframe_pandas.date + < datetime.today() + timedelta(days=50, hours=5), + time_axis="date", + ) + + assert reference.shape[0] == 51 + assert df.shape[0] == 949 + assert reference.columns.values.tolist() == ["date", "f1", "f2"] + assert df.columns.values.tolist() == ["date", "f1", "f2"] + + +def test_split_dataset_pandas_condition_false(test_dataframe_pandas): + with pytest.raises(ValueError) as e: + split_dataset( + test_dataframe_pandas, + split=test_dataframe_pandas.date < datetime.today() - timedelta(days=1), + time_axis="date", + ) + + assert e.value.args[0] == "Reference is empty. Please adjust the `split` argument" diff --git a/tests/popmon/spark/test_spark.py b/tests/popmon/spark/test_spark.py index 03acf199..65cc6513 100644 --- a/tests/popmon/spark/test_spark.py +++ b/tests/popmon/spark/test_spark.py @@ -1,4 +1,5 @@ -from os.path import abspath, dirname, join +from copy import deepcopy +from pathlib import Path import pandas as pd import pytest @@ -20,19 +21,16 @@ def spark_context(): if not spark_found: return None - current_path = dirname(abspath(__file__)) + current_path = Path(__file__).parent scala = "2.12" if int(pyspark_version[0]) >= 3 else "2.11" - hist_spark_jar = join( - current_path, f"jars/histogrammar-sparksql_{scala}-1.0.11.jar" - ) - hist_jar = join(current_path, f"jars/histogrammar_{scala}-1.0.11.jar") + hist_spark_jar = current_path / f"jars/histogrammar-sparksql_{scala}-1.0.11.jar" + hist_jar = current_path / f"jars/histogrammar_{scala}-1.0.11.jar" spark = ( SparkSession.builder.master("local") .appName("popmon-pytest") .config("spark.jars", f"{hist_spark_jar},{hist_jar}") - .config("spark.sql.execution.arrow.enabled", "false") .config("spark.sql.session.timeZone", "GMT") .getOrCreate() ) @@ -51,8 +49,8 @@ def test_spark_stability_metrics(spark_context): features = ["date:isActive", "date:eyeColor", "date:latitude"] bin_specs = { "date": { - "bin_width": pd.Timedelta("1y").value, - "bin_offset": pd.Timestamp("2000-1-1").value, + "bin_width": pd.Timedelta(365, "days").value, + "bin_offset": pd.Timestamp(year=2000, month=1, day=1).value, }, "latitude": {"bin_width": 5.0, "bin_offset": 0.0}, } @@ -75,16 +73,17 @@ def test_spark_stability_metrics(spark_context): "ignore:createDataFrame attempted Arrow optimization because" ) def test_spark_make_histograms(spark_context): - pytest.age["data"]["name"] = "b'age'" - pytest.company["data"]["name"] = "b'company'" - pytest.eyesColor["data"]["name"] = "b'eyeColor'" - pytest.gender["data"]["name"] = "b'gender'" - pytest.isActive["data"]["name"] = "b'isActive'" - pytest.latitude["data"]["name"] = "b'latitude'" - pytest.longitude["data"]["name"] = "b'longitude'" - pytest.transaction["data"]["name"] = "b'transaction'" - - pytest.latitude_longitude["data"]["name"] = "b'latitude:longitude'" + names = [ + "age", + "company", + "eyeColor", + "gender", + "latitude", + "longitude", + "transaction", + ] + + pytest.latitude_longitude["data"]["name"] = "'latitude:longitude'" pytest.latitude_longitude["data"]["bins:name"] = "unit_func" spark_df = spark_context.createDataFrame(pytest.test_df) @@ -113,10 +112,13 @@ def test_spark_make_histograms(spark_context): binning="unit", ) - assert current_hists["age"].toJson() == pytest.age - assert current_hists["company"].toJson() == pytest.company - assert current_hists["eyeColor"].toJson() == pytest.eyesColor - assert current_hists["gender"].toJson() == pytest.gender - assert current_hists["latitude"].toJson() == pytest.latitude - assert current_hists["longitude"].toJson() == pytest.longitude - assert current_hists["transaction"].toJson() == pytest.transaction + # backwards compatibility + for name in names: + v1 = deepcopy(getattr(pytest, name)) + v1["data"]["name"] = f"'{name}'" + + v2 = deepcopy(getattr(pytest, name)) + v2["data"]["name"] = f"b'{name}'" + + output = current_hists[name].toJson() + assert output == v1 or output == v2 diff --git a/tests/popmon/spark/test_split_dataset_spark.py b/tests/popmon/spark/test_split_dataset_spark.py new file mode 100644 index 00000000..be0f7534 --- /dev/null +++ b/tests/popmon/spark/test_split_dataset_spark.py @@ -0,0 +1,161 @@ +from datetime import datetime, timedelta +from pathlib import Path + +import pandas as pd +import pytest + +from popmon.pipeline.dataset_splitter import split_dataset + +try: + from pyspark import __version__ as pyspark_version + from pyspark.sql import SparkSession + + spark_found = True +except (ModuleNotFoundError, AttributeError): + spark_found = False + + +@pytest.fixture +def spark_context(): + if not spark_found: + return None + + current_path = Path(__file__).parent + + scala = "2.12" if int(pyspark_version[0]) >= 3 else "2.11" + hist_spark_jar = current_path / f"jars/histogrammar-sparksql_{scala}-1.0.11.jar" + hist_jar = current_path / f"jars/histogrammar_{scala}-1.0.11.jar" + + spark = ( + SparkSession.builder.master("local") + .appName("popmon-pytest") + .config("spark.jars", f"{hist_spark_jar},{hist_jar}") + .config("spark.sql.session.timeZone", "GMT") + .getOrCreate() + ) + return spark + + +@pytest.fixture +def test_dataframe_spark(spark_context): + n_samples = 1000 + start = datetime.today() + df = pd.DataFrame( + { + "date": [start + timedelta(days=delta) for delta in range(n_samples)], + "f1": [1] * n_samples, + "f2": [0] * n_samples, + } + ) + spark_df = spark_context.createDataFrame(df) + return spark_df + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_int(test_dataframe_spark): + reference, df = split_dataset(test_dataframe_spark, split=3, time_axis="date") + + assert reference.count() == 3 + assert df.count() == 997 + assert reference.columns == ["date", "f1", "f2"] + assert df.columns == ["date", "f1", "f2"] + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_int_underflow(test_dataframe_spark): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_spark, split=0, time_axis="date") + + assert e.value.args[0] == "Number of instances should be greater than 0" + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_int_overflow(test_dataframe_spark): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_spark, split=1001, time_axis="date") + + assert ( + e.value.args[0] + == "Returned dataframe is empty. Please adjust the `split` argument" + ) + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_float(test_dataframe_spark): + reference, df = split_dataset(test_dataframe_spark, split=0.45, time_axis="date") + + assert reference.count() == 450 + assert df.count() == 550 + assert reference.columns == ["date", "f1", "f2"] + assert df.columns == ["date", "f1", "f2"] + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_float_round(test_dataframe_spark): + reference, df = split_dataset(test_dataframe_spark, split=0.8888, time_axis="date") + + assert reference.count() == 888 + assert df.count() == 112 + assert reference.columns == ["date", "f1", "f2"] + assert df.columns == ["date", "f1", "f2"] + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_float_underflow(test_dataframe_spark): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_spark, split=0.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_spark, split=-1.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_float_overflow(test_dataframe_spark): + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_spark, split=1.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + with pytest.raises(ValueError) as e: + _ = split_dataset(test_dataframe_spark, split=10.0, time_axis="date") + + assert e.value.args[0] == "Fraction should be 0 > fraction > 1" + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_condition(test_dataframe_spark): + reference, df = split_dataset( + test_dataframe_spark, + split=f"date < '{(datetime.today() + timedelta(days=50, hours=5)).strftime('%Y-%m-%d %H:%M:%S')}'", + time_axis="date", + ) + + assert reference.count() == 51 + assert df.count() == 949 + assert reference.columns == ["date", "f1", "f2"] + assert df.columns == ["date", "f1", "f2"] + + +@pytest.mark.spark +@pytest.mark.skipif(not spark_found, reason="spark not found") +def test_split_dataset_spark_condition_false(test_dataframe_spark): + with pytest.raises(ValueError) as e: + split_dataset( + test_dataframe_spark, + split=f"date < '{(datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')}'", + time_axis="date", + ) + + assert e.value.args[0] == "Reference is empty. Please adjust the `split` argument"