Skip to content

Commit

Permalink
Merge pull request #127 from FRosner/issue/123
Browse files Browse the repository at this point in the history
EmailReporter (Issue/123)
  • Loading branch information
FRosner authored Jan 31, 2017
2 parents e422082 + 066c4d7 commit 7e1482a
Show file tree
Hide file tree
Showing 12 changed files with 1,135 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Check(contracts)

### Custom Reporters

By default the check result will be printed to stdout using ANSI escape codes to highlight the output. To have a report in another format, you can specify one or more custom reporters.
By default the check result will be printed to stdout using ANSI escape codes to highlight the output. To have a report in another format, you can specify one or more [custom reporters](https://github.com/FRosner/drunken-data-quality/wiki).

```scala
import de.frosner.ddq.reporters.MarkdownReporter
Expand Down
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion.value %

libraryDependencies += "org.mockito" % "mockito-all" % "1.8.4" % "test"

resolvers += "lightshed-maven" at "http://dl.bintray.com/content/lightshed/maven"

libraryDependencies += "ch.lightshed" %% "courier" % "0.1.4"

libraryDependencies += "org.jvnet.mock-javamail" % "mock-javamail" % "1.9" % "test"

spName := "FRosner/drunken-data-quality"

spAppendScalaVersion := true
Expand Down
6 changes: 6 additions & 0 deletions python/pyddq/jvm_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ def simple_date_format(jvm, s):

def tuple2(jvm, t):
return jvm.scala.Tuple2(*t)

def option(jvm, java_obj):
return jvm.scala.Option.apply(java_obj)

def scala_none(jvm):
return getattr(getattr(jvm.scala, "None$"), "MODULE$")
140 changes: 137 additions & 3 deletions python/pyddq/reporters.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from pyddq.streams import PrintStream, OutputStream, ByteArrayOutputStream
from py4j.java_gateway import get_field
import jvm_conversions as jc


class Reporter(object):
def get_jvm_reporter(self, jvm, *args, **kwargs):
raise NotImplementedError

class OutputStreamReporter(Reporter):
def __init__(self, output_stream):
if not isinstance(output_stream, OutputStream):
raise TypeError("output_stream should be a subclass of pyddq.streams.OutputStream")
self.output_stream = output_stream


class MarkdownReporter(Reporter):
class MarkdownReporter(OutputStreamReporter):
"""
A class which produces a markdown report of core.Check.run
Args:
Expand All @@ -25,7 +27,7 @@ def get_jvm_reporter(self, jvm):
)


class ConsoleReporter(Reporter):
class ConsoleReporter(OutputStreamReporter):
"""
A class which produces a console report of core.Check.run
Args:
Expand All @@ -38,7 +40,7 @@ def get_jvm_reporter(self, jvm):
)


class ZeppelinReporter(Reporter):
class ZeppelinReporter(OutputStreamReporter):
"""
A class which produces a report of core.Check.run in a Zeppelin notebook note.
Args:
Expand All @@ -56,3 +58,135 @@ def get_jvm_reporter(self, jvm):
return jvm.de.frosner.ddq.reporters.ZeppelinReporter(
jvm_print_stream
)

class EmailReporter(Reporter):
"""
A class which produces an HTML report of [[CheckResult]] and sends it to the
configured SMTP server.
Args:
smtpServer (str): URL of the SMTP server to use for sending the email
to (Set[str]): Email addresses of the receivers
cc (Set[str]): Email addresses of the carbon copy receivers. Defaults to
an empty set.
subjectPrefix (str): Prefix to put in the email subject. Defaults to
"Data Quality Report: ".
smtpPort (int): Port of the SMTP server to use for sending the email.
Defaults to 25.
from_ (str): Email address of the sender. Defaults to "mail.ddq.io".
usernameAndPassword (Tuple[str, str]): Optional credentials. Defaults
to None.
reportOnlyOnFailure (bool): Whether to report only if there is a failing
check (True) or always (False). Defaults to False.
accumulatedReport (bool): Whether to report for each check result (False)
or only when a report is triggered (True). The accumulated option
requires the reporter to stick around until manually triggered or
else you will lose the results. Defaults to False.
"""
def __init__(self, smtpServer, to, cc=None, subjectPrefix=None,
smtpPort=None, from_=None, usernameAndPassword=None,
reportOnlyOnFailure=None, accumulatedReport=None):
self._smtpServer = smtpServer
self._to = to
self._cc = cc
self._subjectPrefix = subjectPrefix
self._smtpPort = smtpPort
self._from_ = from_
self._usernameAndPassword = usernameAndPassword
self._reportOnlyOnFailure = reportOnlyOnFailure
self._accumulatedReport = accumulatedReport
self._jvm_reporter = None

@property
def smtpServer(self):
return self._smtpServer

@property
def to(self):
return self._to

@property
def cc(self):
return self._cc

@property
def subjectPrefix(self):
return self._subjectPrefix

@property
def smtpPort(self):
return self._smtpPort

@property
def from_(self):
return self._from_

@property
def usernameAndPassword(self):
return self._usernameAndPassword

@property
def reportOnlyOnFailure(self):
return self._reportOnlyOnFailure

@property
def accumulatedReport(self):
return self._accumulatedReport

def get_jvm_reporter(self, jvm):
if not self._jvm_reporter:
self._jvm = jvm
jvm_email_reporter = jvm.de.frosner.ddq.reporters.EmailReporter
to = jc.iterable_to_scala_set(jvm, self.to)

if self.cc is None:
cc = getattr(jvm_email_reporter, "apply$default$3")()
else:
cc = jc.iterable_to_scala_set(jvm, self.cc)

if self.subjectPrefix is None:
subjectPrefix = getattr(jvm_email_reporter, "apply$default$4")()
else:
subjectPrefix = self.subjectPrefix

if self.smtpPort is None:
smtpPort = getattr(jvm_email_reporter, "apply$default$5")()
else:
smtpPort = self.smtpPort

if self.from_ is None:
from_ = getattr(jvm_email_reporter, "apply$default$6")()
else:
from_ = self.from_

if self.usernameAndPassword is None:
usernameAndPassword = getattr(jvm_email_reporter, "apply$default$7")()
else:
usernameAndPassword = jc.option(jvm, (jc.tuple2(jvm, self.usernameAndPassword)))

if self.reportOnlyOnFailure is None:
reportOnlyOnFailure = getattr(jvm_email_reporter, "apply$default$8")()
else:
reportOnlyOnFailure = self.reportOnlyOnFailure

if self.accumulatedReport is None:
accumulatedReport = getattr(jvm_email_reporter, "apply$default$9")()
else:
accumulatedReport = self.accumulatedReport

self._jvm_reporter = jvm_email_reporter(
self.smtpServer, to, cc, subjectPrefix, smtpPort, from_,
usernameAndPassword, reportOnlyOnFailure, accumulatedReport
)

return self._jvm_reporter

def sendAccumulatedReport(self, accumulatedCheckName=None):
if not self._jvm_reporter:
raise ValueError("No checks executed. Please run your checks before sending out a report.")

if accumulatedCheckName is None:
accumulatedCheckName = jc.scala_none(self._jvm)
else:
accumulatedCheckName = jc.option(self._jvm, accumulatedCheckName)

self._jvm_reporter.sendAccumulatedReport(accumulatedCheckName)
39 changes: 38 additions & 1 deletion python/tests/integration/test_reporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import SQLContext

from pyddq.core import Check
from pyddq.reporters import ConsoleReporter, MarkdownReporter, ZeppelinReporter
from pyddq.reporters import ConsoleReporter, MarkdownReporter, ZeppelinReporter, EmailReporter
from pyddq.streams import ByteArrayOutputStream


Expand Down Expand Up @@ -89,6 +89,43 @@ def test_output(self):
""".strip()
self.assertEqual(baos.get_output(), expected_output)

class EmailReporterTest(unittest.TestCase):
def setUp(self):
self.sc = SparkContext()
self.sql = SQLContext(self.sc)
self.df = self.sql.createDataFrame([(1, "a"), (1, None), (3, "c")])

def tearDown(self):
self.sc.stop()

def test_default_arguments(self):
check = Check(self.df).hasUniqueKey("_1").hasUniqueKey("_1", "_2")
reporter = EmailReporter("[email protected]", {"[email protected]"})
check.run([reporter])

def test_passed_arguments(self):
check = Check(self.df).hasUniqueKey("_1").hasUniqueKey("_1", "_2")
smtpServer = "[email protected]"
to = {"[email protected]"}
cc = {"[email protected]"}
subjectPrefix = "my subject prefix: "
smtpPort = 9000
from_ = "test.ddq.io"
usernameAndPassword = ("username", "password")
reportOnlyOnFailure = True
accumulatedReport = True
reporter = EmailReporter(
smtpServer, to, cc, subjectPrefix, smtpPort, from_,
usernameAndPassword, reportOnlyOnFailure, accumulatedReport
)
check.run([reporter])

def test_accumulated_report(self):
check = Check(self.df).hasUniqueKey("_1").hasUniqueKey("_1", "_2")
reporter = EmailReporter("[email protected]", {"[email protected]"}, accumulatedReport=True)
check.run([reporter])
reporter.sendAccumulatedReport()
reporter.sendAccumulatedReport("111")

if __name__ == '__main__':
unittest.main()
29 changes: 29 additions & 0 deletions python/tests/unit/test_reporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,34 @@ def test_get_jvm_reporter(self):
jvm.java.io.PrintStream.assert_called_with(output_stream)


class EmailReporter(unittest.TestCase):
def test_get_jvm_reporter(self):
jvm = Mock()
ddq_email_reporter = Mock()
jvm.de.frosner.ddq.reporters.EmailReporter = Mock(
return_value = ddq_email_reporter
)
reporter = r.EmailReporter("smtp", "to")
jvm_reporter = reporter.get_jvm_reporter(jvm)
self.assertEqual(
jvm_reporter,
ddq_email_reporter
)

def test_send_accumulated_report(self):
jvm = Mock()
ddq_email_reporter = Mock()
jvm.de.frosner.ddq.reporters.EmailReporter = Mock(
return_value = ddq_email_reporter
)
reporter = r.EmailReporter("smtp", "to")
self.assertRaises(ValueError, reporter.sendAccumulatedReport)

reporter.get_jvm_reporter(jvm) # usually called by Check.run
reporter.sendAccumulatedReport()

self.assertTrue(ddq_email_reporter.sendAccumulatedReport.called)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import de.frosner.ddq.core.CheckResult
* @param stream The [[java.io.PrintStream]] to put the output. The stream will not be closed internally and can
* be reused.
**/
case class ConsoleReporter(stream: PrintStream = Console.out) extends PrintStreamReporter {
case class ConsoleReporter(stream: PrintStream = Console.out) extends HumanReadableReporter {

/**
* Output console report of a given checkResult to the stream passed to the constructor
Expand Down
Loading

0 comments on commit 7e1482a

Please sign in to comment.