Skip to content

Commit

Permalink
Merge pull request #129 from FRosner/issue/128
Browse files Browse the repository at this point in the history
#128 custom constraints (issue/128)
  • Loading branch information
FRosner authored Jan 31, 2017
2 parents 7e1482a + 4ec1544 commit 32ed018
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/main/scala/de/frosner/ddq/constraints/CustomConstraint.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package de.frosner.ddq.constraints

import org.apache.spark.sql.DataFrame

import CustomConstraint.{FailureMsg, SuccessMsg}

import scala.util.Try

case class CustomConstraint(name: String,
constraintFunction: DataFrame => Either[FailureMsg, SuccessMsg]
) extends Constraint {

val fun = (df: DataFrame) => {
val tryFun = Try(constraintFunction(df))
val messagePrefix = s"Custom constraint '$name'"
val message = tryFun.map {
case Left(failureMsg) => s"$messagePrefix failed: $failureMsg"
case Right(successMsg) => s"$messagePrefix succeeded: $successMsg"
}.recover {
case throwable => s"$messagePrefix errored: $throwable"
}.get
val status = ConstraintUtil.tryToStatus[Either[FailureMsg, SuccessMsg]](tryFun, _.isRight)
CustomConstraintResult(this, message, status)
}

}

case class CustomConstraintResult(constraint: CustomConstraint,
message: String,
status: ConstraintStatus) extends ConstraintResult[CustomConstraint]

object CustomConstraint {

type SuccessMsg = String
type FailureMsg = String

}
24 changes: 24 additions & 0 deletions src/main/scala/de/frosner/ddq/core/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package de.frosner.ddq.core
import java.text.SimpleDateFormat
import java.util.UUID

import de.frosner.ddq.constraints.CustomConstraint.{FailureMsg, SuccessMsg}
import de.frosner.ddq.constraints._
import de.frosner.ddq.reporters.{ConsoleReporter, Reporter}
import de.frosner.ddq.{constraints, core}
Expand Down Expand Up @@ -185,6 +186,18 @@ case class Check(dataFrame: DataFrame,
Check.isEqualTo(other)
)

/**
* Check a custom constraint. It's up to you! If you think this constraint makes sense, feel free to
* create a feature request or provide a pull request.
*
* @param name what the constraint is called
* @param fun function that computes the constraint and returns either a failure or success
* @return [[core.Check]] object
*/
def custom(name: String, fun: DataFrame => Either[FailureMsg, SuccessMsg]): Check = addConstraint(
Check.custom(name, fun)
)

/**
* Run check with all the previously specified constraints and report to every reporter passed as an argument
*
Expand Down Expand Up @@ -397,4 +410,15 @@ object Check {
def isEqualTo(other: DataFrame): Constraint =
ExactEqualityConstraint(other)

/**
* Check a custom constraint. It's up to you! If you think this constraint makes sense, feel free to
* create a feature request or provide a pull request.
*
* @param name what the constraint is called
* @param fun function that computes the constraint and returns either a failure or success
* @return [[constraints.Constraint]] object
*/
def custom(name: String, fun: DataFrame => Either[FailureMsg, SuccessMsg]): Constraint =
CustomConstraint(name, fun)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package de.frosner.ddq.constraints

import de.frosner.ddq.core.Check
import de.frosner.ddq.testutils.{SparkContexts, TestData}
import org.apache.spark.sql.AnalysisException
import org.scalatest.{FlatSpec, Matchers}

class CustomConstraintTest extends FlatSpec with Matchers with SparkContexts {

"A CustomConstraint" should "succeed if the function returns a success message" in {
val constraintName = "name"
val successMsg = "success"
val check = Check(TestData.makeNullableStringDf(spark, List("a"))).custom(constraintName, {
df => Right(successMsg)
})
val constraint = check.constraints.head
val result = CustomConstraintResult(
constraint = constraint.asInstanceOf[CustomConstraint],
message = s"Custom constraint '$constraintName' succeeded: $successMsg",
status = ConstraintSuccess
)
check.run().constraintResults shouldBe Map(constraint -> result)
}

it should "fail if the function returns a failure message" in {
val constraintName = "name"
val failureMsg = "failure"
val check = Check(TestData.makeNullableStringDf(spark, List("a"))).custom(constraintName, {
df => Left(failureMsg)
})
val constraint = check.constraints.head
val result = CustomConstraintResult(
constraint = constraint.asInstanceOf[CustomConstraint],
message = s"Custom constraint '$constraintName' failed: $failureMsg",
status = ConstraintFailure
)
check.run().constraintResults shouldBe Map(constraint -> result)
}

it should "error if the function throws an exception" in {
val constraintName = "name"
val exception = new Exception()
val check = Check(TestData.makeNullableStringDf(spark, List("a"))).custom(constraintName, {
df => throw exception
})
val constraint = check.constraints.head
val result = check.run().constraintResults(constraint)
result match {
case CustomConstraintResult(
customConstraint: CustomConstraint,
"Custom constraint 'name' errored: java.lang.Exception",
constraintError: ConstraintError
) => {
constraintError.throwable shouldBe exception
}
}
}

}

0 comments on commit 32ed018

Please sign in to comment.