diff --git a/src/main/scala/de/frosner/ddq/constraints/CustomConstraint.scala b/src/main/scala/de/frosner/ddq/constraints/CustomConstraint.scala new file mode 100644 index 0000000..b9e0433 --- /dev/null +++ b/src/main/scala/de/frosner/ddq/constraints/CustomConstraint.scala @@ -0,0 +1,37 @@ +package de.frosner.ddq.constraints + +import org.apache.spark.sql.DataFrame + +import CustomConstraint.{FailureMsg, SuccessMsg} + +import scala.util.Try + +case class CustomConstraint(name: String, + constraintFunction: DataFrame => Either[FailureMsg, SuccessMsg] + ) extends Constraint { + + val fun = (df: DataFrame) => { + val tryFun = Try(constraintFunction(df)) + val messagePrefix = s"Custom constraint '$name'" + val message = tryFun.map { + case Left(failureMsg) => s"$messagePrefix failed: $failureMsg" + case Right(successMsg) => s"$messagePrefix succeeded: $successMsg" + }.recover { + case throwable => s"$messagePrefix errored: $throwable" + }.get + val status = ConstraintUtil.tryToStatus[Either[FailureMsg, SuccessMsg]](tryFun, _.isRight) + CustomConstraintResult(this, message, status) + } + +} + +case class CustomConstraintResult(constraint: CustomConstraint, + message: String, + status: ConstraintStatus) extends ConstraintResult[CustomConstraint] + +object CustomConstraint { + + type SuccessMsg = String + type FailureMsg = String + +} diff --git a/src/main/scala/de/frosner/ddq/core/Check.scala b/src/main/scala/de/frosner/ddq/core/Check.scala index 8f6b0a0..5bb5444 100644 --- a/src/main/scala/de/frosner/ddq/core/Check.scala +++ b/src/main/scala/de/frosner/ddq/core/Check.scala @@ -3,6 +3,7 @@ package de.frosner.ddq.core import java.text.SimpleDateFormat import java.util.UUID +import de.frosner.ddq.constraints.CustomConstraint.{FailureMsg, SuccessMsg} import de.frosner.ddq.constraints._ import de.frosner.ddq.reporters.{ConsoleReporter, Reporter} import de.frosner.ddq.{constraints, core} @@ -185,6 +186,18 @@ case class Check(dataFrame: DataFrame, Check.isEqualTo(other) ) + /** + * Check a custom constraint. It's up to you! If you think this constraint makes sense, feel free to + * create a feature request or provide a pull request. + * + * @param name what the constraint is called + * @param fun function that computes the constraint and returns either a failure or success + * @return [[core.Check]] object + */ + def custom(name: String, fun: DataFrame => Either[FailureMsg, SuccessMsg]): Check = addConstraint( + Check.custom(name, fun) + ) + /** * Run check with all the previously specified constraints and report to every reporter passed as an argument * @@ -397,4 +410,15 @@ object Check { def isEqualTo(other: DataFrame): Constraint = ExactEqualityConstraint(other) + /** + * Check a custom constraint. It's up to you! If you think this constraint makes sense, feel free to + * create a feature request or provide a pull request. + * + * @param name what the constraint is called + * @param fun function that computes the constraint and returns either a failure or success + * @return [[constraints.Constraint]] object + */ + def custom(name: String, fun: DataFrame => Either[FailureMsg, SuccessMsg]): Constraint = + CustomConstraint(name, fun) + } diff --git a/src/test/scala/de/frosner/ddq/constraints/CustomConstraintTest.scala b/src/test/scala/de/frosner/ddq/constraints/CustomConstraintTest.scala new file mode 100644 index 0000000..784c4a9 --- /dev/null +++ b/src/test/scala/de/frosner/ddq/constraints/CustomConstraintTest.scala @@ -0,0 +1,59 @@ +package de.frosner.ddq.constraints + +import de.frosner.ddq.core.Check +import de.frosner.ddq.testutils.{SparkContexts, TestData} +import org.apache.spark.sql.AnalysisException +import org.scalatest.{FlatSpec, Matchers} + +class CustomConstraintTest extends FlatSpec with Matchers with SparkContexts { + + "A CustomConstraint" should "succeed if the function returns a success message" in { + val constraintName = "name" + val successMsg = "success" + val check = Check(TestData.makeNullableStringDf(spark, List("a"))).custom(constraintName, { + df => Right(successMsg) + }) + val constraint = check.constraints.head + val result = CustomConstraintResult( + constraint = constraint.asInstanceOf[CustomConstraint], + message = s"Custom constraint '$constraintName' succeeded: $successMsg", + status = ConstraintSuccess + ) + check.run().constraintResults shouldBe Map(constraint -> result) + } + + it should "fail if the function returns a failure message" in { + val constraintName = "name" + val failureMsg = "failure" + val check = Check(TestData.makeNullableStringDf(spark, List("a"))).custom(constraintName, { + df => Left(failureMsg) + }) + val constraint = check.constraints.head + val result = CustomConstraintResult( + constraint = constraint.asInstanceOf[CustomConstraint], + message = s"Custom constraint '$constraintName' failed: $failureMsg", + status = ConstraintFailure + ) + check.run().constraintResults shouldBe Map(constraint -> result) + } + + it should "error if the function throws an exception" in { + val constraintName = "name" + val exception = new Exception() + val check = Check(TestData.makeNullableStringDf(spark, List("a"))).custom(constraintName, { + df => throw exception + }) + val constraint = check.constraints.head + val result = check.run().constraintResults(constraint) + result match { + case CustomConstraintResult( + customConstraint: CustomConstraint, + "Custom constraint 'name' errored: java.lang.Exception", + constraintError: ConstraintError + ) => { + constraintError.throwable shouldBe exception + } + } + } + +}