-
Notifications
You must be signed in to change notification settings - Fork 1
Home
The core concepts in Wharlord are checks, constraints, reporters and the runner.
A check contains a set of constraints for one data frame. Each constraint checks a specific property related to data quality on this data frame. By passing a set of checks to a runner, it executes them and hands them over to a set of reporters.
To create a check, it is sufficient to specify the data frame you would like to work with.
Check(dataFrame: DataFrame)
There are more options available for advanced users:
- You can specify a display name, which will be used instead of the default string representation in reports.
- Also you can select a cache method (default is
MEMORY_ONLY
). Caching the data frame makes sense if you execute a lot of different checks on it and it is not cached already. If you don't want to cache, passNone
. - If required, you can specify a set of constraints upfront, if you don't want to add them using the fluent interface.
- Also it is possible to pick an ID instead of a randomly generated one. The ID is used in sophisticated reporters to identify a check. Don't change it if you don't need to.
Check(
dataFrame: DataFrame,
displayName: Option[String] = Option.empty,
cacheMethod: Option[StorageLevel] = Check.defaultCacheMethod,
constraints: Seq[Constraint] = Seq.empty,
id: String = UUID.randomUUID.toString
)
A check offers multiple constraints to verify. You can add them using a fluent interface.
Check whether the given constraint is satisfied. You may provide a constraint as a SQL string or a Column
instance.
def satisfies(constraint: String): Check
def satisfies(constraint: Column): Check
Check(customers).satisfies("age > 0")
Check(customers).satisfies(customers("age") > 0)
Check whether the given conditional constraint is satisfied. Be aware that it might cause problems with null values, as A -> B
gets translated to !A || B
and comparing null
to anything will always yield false
.
def satisfies(conditional: (Column, Column)): Check
Check(customers).satisfies(customers("age") > 50 -> customers("seniority") === "high")
Check whether the column with the given name contains only null values.
def isAlwaysNull(columnName: String): Check
Check(customers).isAlwaysNull("complaint")
Check whether the column with the given name contains no null values.
def isNeverNull(columnName: String): Check
Check(customers).isNeverNull("age")
Check whether the column with the given name is always matching the specified regular expression.
def isMatchingRegex(columnName: String, regex: String): Check
Check("customers").isMatchingRegex("email", "^[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,6}$")
Check whether the column with the given name is always any of the specified values.
def isAnyOf(columnName: String, allowed: Set[Any]): Check
Check(customers).isAnyOf("gender", Set("m", "f"))
Check whether the column with the given name can be converted to a date using the specified date format.
def isFormattedAsDate(columnName: String, dateFormat: SimpleDateFormat): Check
Check(contracts).isFormattedAsDate("signatureDate", new SimpleDateFormat("yyyy-MM-dd"))
Check whether the column with the given name can be converted to the given type.
def isConvertibleTo(columnName: String, targetType: DataType): Check
Check(transactions).isConveritbleTo("amount", DoubleType)
Check whether the table has exactly the given number of rows.
def hasNumRows(expected: (Column) -> Column): Check
Check(clicks).hasNumRows(_ > 10)
Check whether the given columns are a unique key for this table.
def hasUniqueKey(columnName: String, columnNames: String*): Check
Check(connections).hasUniqueKey("time", "thread")
Check whether the columns with the given names define a foreign key to the specified reference table. Note that a foreign key needs to be a unique key in the reference table, which will also be checked.
def hasForeignKey(referenceTable: DataFrame, keyMap: (String, String), keyMaps: (String, String)*): Check
Check(contracts).hasForeignKey(customers, "customerId" -> "id")
Check whether a join between this table and the given reference table returns any results. It will also output a percentage of matching keys between the base and the reference table.
def isJoinableWith(referenceTable: DataFrame, keyMap: (String, String), keyMaps: (String, String)*): Check
Check(contracts).isJoinableWith(customers, "customerId" -> "id")
Check whether the columns in the dependent set have a functional dependency on the determinant set. This can be used to check a "foreign key" relationship in a denormalized table.
def hasFunctionalDependency(determinantSet: Seq[String], dependentSet: Seq[String]): Check
Check(records).hasFunctionalDependency(Seq("artist.id"), Seq("artist.name", "artist.country"))
In order to run a set of constraints, just execute the run
method on the check. You can pass a list of reporters as well. If no reporter is passed, it will report to the console output stream using a console reporter.
def run(reporters: Reporter*): CheckResult
In order to report one or multiple check results to one or multiple reporters, use the Runner
object. The runner will then execute all checks, report the results to all reporters, and return all results in a programmatic way so you can use it for other purposes (e.g. unit testing).
val check1: Check = ???
val check2: Check = ???
val reporter1: Reporter = ???
val reporter2: Reporter = ???
val result = Runner.run(Seq(check1, check2), Seq(reporter1, reporter2)
The console reporter is a simple reporter meant for interactive usage (e.g. on the Spark shell). It prints checks and constraint results to the specified print stream, coloured by ANSI terminal markup.
ConsoleReporter(
stream: PrintStream = Console.out
)
The markdown reporter is another simple reporter suitable for both, interactive and non-interactive usage. It prints the constraint results to the specified print stream in markdown layout. If you want to store the markdown file, you can wrap a FileOutputStream
into a PrintStream
.
MarkdownReporter(stream: PrintStream)
The Zeppelin reporter can be used to show the results in a Zeppelin notebook note. Make sure to use exactly one ZeppelinReporter
instance per note.
The log4j reporter is more sophisticated. It serializes all available information about the check, the constraints and the results into a JSON string and logs it to the specified logger using the specified level.
Log4jReporter(
logger: Logger = org.apache.log4j.Logger.getLogger("Wharlord"),
logLevel: Level = org.apache.log4j.Level.INFO
)