Skip to content

Commit

Permalink
Updates for returning data frames (#103)
Browse files Browse the repository at this point in the history
* Add docs for returning spark data frames
* Move test for has errors up
* Refactor row to record
* Update test result conversion test
* Update measurements conversion test
* Update error conversion test
  • Loading branch information
JCZuurmond authored Dec 27, 2021
1 parent a1d49e8 commit 1036d8a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 88 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Add example to README about returning spark data frames as scan result

## [0.3.0] - 2021-12-22

- Provides the ability to get the scan results in Dataframes. ([#99](https://github.com/sodadata/soda-spark/pull/99))
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ See the
[scan result object](https://github.com/sodadata/soda-sql/blob/main/core/sodasql/scan/scan_result.py)
for all attributes and methods.

Or, return Spark data frames:

``` python
>>> measurements, test_results, errors = scan.execute(scan_yml, df, as_frames=True)
>>>
>>> measurements # doctest: +ELLIPSIS
DataFrame[metric: string, column_name: string, value: string, ...]
>>> test_results # doctest: +ELLIPSIS
DataFrame[test: struct<...>, passed: boolean, skipped: boolean, values: map<string,string>, ...]
>>>
```

See the `_to_data_frame` functions in the [`scan.py`](./src/sodaspark/scan.py)
to see how the conversion is done.

### Send results to Soda cloud

Send the scan result to Soda cloud.
Expand Down
16 changes: 5 additions & 11 deletions src/sodaspark/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,7 @@ def measurements_to_data_frame(measurements: list[Measurement]) -> DataFrame:
]
)
spark_session = SparkSession.builder.getOrCreate()
out = spark_session.createDataFrame(
measurements,
schema=schema,
)
out = spark_session.createDataFrame(measurements, schema=schema)
return out


Expand All @@ -333,12 +330,12 @@ def test_results_to_data_frame(test_results: list[TestResult]) -> DataFrame:
"""
schema_test = T.StructType(
[
T.StructField("column", T.StringType(), True),
T.StructField("expression", T.StringType(), True),
T.StructField("id", T.StringType(), True),
T.StructField("title", T.StringType(), True),
T.StructField("expression", T.StringType(), True),
T.StructField("metrics", T.ArrayType(T.StringType(), True), True),
T.StructField("column", T.StringType(), True),
T.StructField("source", T.StringType(), True),
T.StructField("title", T.StringType(), True),
]
)

Expand All @@ -357,10 +354,7 @@ def test_results_to_data_frame(test_results: list[TestResult]) -> DataFrame:
]
)
spark_session = SparkSession.builder.getOrCreate()
out = spark_session.createDataFrame(
test_results,
schema=schema,
)
out = spark_session.createDataFrame(test_results, schema=schema)
return out


Expand Down
169 changes: 92 additions & 77 deletions tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import BinaryIO

import pytest
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql import functions as F # noqa: N812
from pyspark.sql import types as T # noqa: N812
from sodasql.dialects.spark_dialect import SparkDialect
Expand Down Expand Up @@ -145,7 +145,7 @@ def scan_definition() -> str:


@dataclass
class Row:
class Record:
id: str
name: str
size: int
Expand All @@ -161,12 +161,12 @@ def df(spark_session: SparkSession) -> DataFrame:
date = dt.date(2021, 1, 1)
id = "a76824f0-50c0-11eb-8be8-88e9fe6293fd"
data = [
Row(id, "Paula Landry", 3006, date, "28,42 %", "UK"),
Row(id, "Kevin Crawford", 7243, date, "22,75 %", "NL"),
Row(id, "Kimberly Green", 6589, date, "11,92 %", "US"),
Row(id, "William Fox", 1972, date, "14,26 %", "UK"),
Row(id, "Cynthia Gonzales", 3687, date, "18,32 %", "US"),
Row(id, "Kim Brown", 1277, date, "16,37 %", "US"),
Record(id, "Paula Landry", 3006, date, "28,42 %", "UK"),
Record(id, "Kevin Crawford", 7243, date, "22,75 %", "NL"),
Record(id, "Kimberly Green", 6589, date, "11,92 %", "US"),
Record(id, "William Fox", 1972, date, "14,26 %", "UK"),
Record(id, "Cynthia Gonzales", 3687, date, "18,32 %", "US"),
Record(id, "Kim Brown", 1277, date, "16,37 %", "US"),
]

schema = T.StructType(
Expand Down Expand Up @@ -215,6 +215,17 @@ def test_create_scan_has_spark_dialect(
assert isinstance(scanner.dialect, SparkDialect)


def test_scan_execute_scan_result_does_not_contain_any_errors(
scan_definition: str,
df: DataFrame,
) -> None:
"""The scan results should not contain any erros."""

scan_result = scan.execute(scan_definition, df)

assert not scan_result.has_errors()


@pytest.mark.parametrize(
"measurement",
[
Expand Down Expand Up @@ -335,17 +346,6 @@ def test_scan_execute_contains_expected_test_result(
)


def test_scan_execute_scan_result_does_not_contain_any_errors(
scan_definition: str,
df: DataFrame,
) -> None:
"""The scan results should not contain any erros."""

scan_result = scan.execute(scan_definition, df)

assert not scan_result.has_errors()


def test_excluded_columns_date_is_not_present(
scan_definition: str,
df: DataFrame,
Expand Down Expand Up @@ -376,25 +376,72 @@ def test_scan_execute_with_soda_server_client_scan_result_does_not_contain_any_e
assert not scan_result.has_errors()


def test_measurements_to_data_frame(spark_session: SparkSession) -> None:
"""
Test conversions of measurements to dataframe.
A failure of this test indicates that the `Measurement` dataclass has been
changed in `soda-sql`. If a failure happens, the code needs to be updated to
accomodate for that change. Start with updating the expected output data
frame in this test, then change the schema used for converting the
measurements.
"""
expected = spark_session.createDataFrame(
[
Row(
metric="values_count",
column_name="officename",
value="",
group_values=[
Row(group={"statename": "statename"}, value="9872")
],
)
]
).withColumn(
"value", F.when(F.col("value") == "", None).otherwise(F.col("value"))
)

measurements = [
Measurement(
metric="values_count",
column_name="officename",
value=None,
group_values=[
GroupValue(group={"statename": "statename"}, value="9872")
],
)
]
out = scan.measurements_to_data_frame(measurements)
assert expected.collect() == out.collect()


def test_test_results_to_data_frame(spark_session: SparkSession) -> None:
"""Test conversions of test_result to dataframe."""
"""
Test conversions of test_result to dataframe.
A failure of this test indicates that the `TestResult` dataclass has been
changed in `soda-sql`. If a failure happens, the code needs to be updated to
accomodate for that change. Start with updating the expected output data
frame in this test, then change the schema used for converting the test
results.
"""
expected = spark_session.createDataFrame(
[
{
"test": Test(
Row(
test=Row(
id="id",
title="title",
expression="expression",
metrics=["metrics"],
column="column",
source="source",
),
"passed": True,
"skipped": False,
"values": {"value": "10"},
"error": "exception",
"group_values": {"group": "by"},
}
passed=True,
skipped=False,
values={"value": "10"},
error="exception",
group_values={"group": "by"},
)
]
)

Expand All @@ -416,68 +463,36 @@ def test_test_results_to_data_frame(spark_session: SparkSession) -> None:
)
]
out = scan.test_results_to_data_frame(test_results)
assert (
expected.select(sorted(expected.columns)).collect()
== out.select(sorted(out.columns)).collect()
)


def test_measurements_to_data_frame(spark_session: SparkSession) -> None:
"""Test conversions of measurements to dataframe."""
expected = spark_session.createDataFrame(
[
{
"metric": "values_count",
"column_name": "officename",
"value": "",
"group_values": [
GroupValue(group={"statename": "statename"}, value="9872")
],
}
]
).withColumn(
"value", F.when(F.col("value") == "", None).otherwise(F.col("value"))
)
assert expected.collect() == out.collect()

measurements = [
Measurement(
metric="values_count",
column_name="officename",
value=None,
group_values=[
GroupValue(group={"statename": "statename"}, value="9872")
],
)
]
out = scan.measurements_to_data_frame(measurements)
assert (
expected.select(sorted(expected.columns)).collect()
== out.select(sorted(out.columns)).collect()
)

def test_scan_error_to_data_frame(spark_session: SparkSession) -> None:
"""
Test conversions of scan_error to dataframe.
def test_scanerror_to_data_frame(spark_session: SparkSession) -> None:
"""Test conversions of scan_error to dataframe."""
A failure of this test indicates that the `ScanError` dataclass has been
changed in `soda-sql`. If a failure happens, the code needs to be updated to
accomodate for that change. Start with updating the expected output data
frame in this test, then change the schema used for converting the scan
error.
"""
expected = spark_session.createDataFrame(
[
{
"message": 'Test "metric_name > 30" failed',
"exception": "name 'metric_name' is not defined",
}
Row(
message='Test "metric_name > 30" failed',
exception="name 'metric_name' is not defined",
)
]
)

scanerrors = [
scan_errors = [
TestExecutionScanError(
message='Test "metric_name > 30" failed',
exception="name 'metric_name' is not defined",
)
]
out = scan.scan_errors_to_data_frame(scanerrors)
assert (
expected.select(sorted(expected.columns)).collect()
== out.select(sorted(out.columns)).collect()
)
out = scan.scan_errors_to_data_frame(scan_errors)
assert expected.collect() == out.collect()


def test_scan_execute_return_as_data_frame(
Expand Down

0 comments on commit 1036d8a

Please sign in to comment.