Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Email Microservice #3095

Merged
merged 23 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
14 changes: 14 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version = "2.0.1"
maxColumn = 140
align = most
continuationIndent.defnSite = 2
assumeStandardLibraryStripMargin = true
docstrings = ScalaDoc
includeCurlyBraceInSelectChains = false
danglingParentheses = true
spaces {
inImportCurlyBraces = true
}
optIn.annotationNewlines = true

rewrite.rules = [SortImports, RedundantBraces, PreferCurlyFors]
47 changes: 27 additions & 20 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Dependencies._
import BuildSettings._
import com.lucidchart.sbt.scalafmt.ScalafmtCorePlugin.autoImport._
import sbtassembly.AssemblyPlugin.autoImport.assemblyMergeStrategy

lazy val commonDeps = Seq(logback, scalaTest, scalaCheck)
Expand Down Expand Up @@ -54,16 +53,11 @@ lazy val akkaPersistenceDeps =

lazy val akkaHttpDeps =
Seq(akkaHttp, akkaHttp2, akkaHttpTestkit, akkaStreamsTestKit, akkaHttpCirce)
lazy val circeDeps = Seq(circe, circeGeneric, circeParser)
lazy val circeDeps = Seq(circe, circeGeneric, circeParser)
lazy val enumeratumDeps = Seq(enumeratum, enumeratumCirce)

lazy val slickDeps = Seq(slick, slickHikaryCP, postgres, h2)

lazy val scalafmtSettings = Seq(
scalafmtOnCompile in ThisBuild := true,
scalafmtTestOnCompile in ThisBuild := true
)

lazy val dockerSettings = Seq(
Docker / maintainer := "Hmda-Ops",
dockerBaseImage := "openjdk:8-jre-alpine3.9",
Expand All @@ -75,7 +69,7 @@ lazy val packageSettings = Seq(
mappings in Universal := {
// universalMappings: Seq[(File,String)]
val universalMappings = (mappings in Universal).value
val fatJar = (assembly in Compile).value
val fatJar = (assembly in Compile).value
// removing means filtering
val filtered = universalMappings filter {
case (_, fileName) => !fileName.endsWith(".jar")
Expand Down Expand Up @@ -139,7 +133,6 @@ lazy val `hmda-spark-reporting` = (project in file("hmda-spark-reporting"))
}
),
Seq(libraryDependencies ++= sparkDeps ++ circeDeps ++ akkaDeps),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand All @@ -165,7 +158,6 @@ lazy val `hmda-platform` = (project in file("hmda"))
oldStrategy(x)
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -193,7 +185,6 @@ lazy val `check-digit` = (project in file("check-digit"))
oldStrategy(x)
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -221,7 +212,6 @@ lazy val `institutions-api` = (project in file("institutions-api"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand All @@ -248,7 +238,6 @@ lazy val `hmda-data-publisher` = (project in file("hmda-data-publisher"))
oldStrategy(x)
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -276,7 +265,6 @@ lazy val `ratespread-calculator` = (project in file("ratespread-calculator"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -304,7 +292,6 @@ lazy val `modified-lar` = (project in file("modified-lar"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -333,7 +320,6 @@ lazy val `irs-publisher` = (project in file("irs-publisher"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -362,7 +348,6 @@ lazy val `hmda-reporting` = (project in file("hmda-reporting"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -400,7 +385,6 @@ lazy val `hmda-analytics` = (project in file("hmda-analytics"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -428,7 +412,6 @@ lazy val `rate-limit` = (project in file("rate-limit"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
Expand Down Expand Up @@ -457,8 +440,32 @@ lazy val `data-browser` = (project in file("data-browser"))
s"${name.value}.jar"
}
),
scalafmtSettings,
dockerSettings,
packageSettings
)
.dependsOn(common % "compile->compile;test->test")

lazy val `email-service` = (project in file("email-service"))
.enablePlugins(JavaServerAppPackaging, sbtdocker.DockerPlugin, AshScriptPlugin)
.settings(hmdaBuildSettings: _*)
.settings(
Seq(
mainClass in Compile := Some("hmda.publication.lar.EmailReceiptApp"),
assemblyMergeStrategy in assembly := {
case "application.conf" => MergeStrategy.concat
case "META-INF/io.netty.versions.properties" => MergeStrategy.concat
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
},
assemblyJarName in assembly := {
s"${name.value}.jar"
},
libraryDependencies ++= monix :: akkaKafkaStreams :: awsSesSdk :: logback :: Nil
),
dockerSettings,
packageSettings
)
.dependsOn(common % "compile->compile;test->test")
.dependsOn(`hmda-protocol`)
.dependsOn(common)
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package hmda.uli.api.grpc

import akka.stream.Materializer
import hmda.grpc.services.{
CheckDigitService,
ValidUliRequest,
ValidUliResponse,
}
import hmda.grpc.services.{ CheckDigitService, ValidUliRequest, ValidUliResponse }
import hmda.uli.validation.ULI._
import scala.concurrent.Future

class CheckDigitServiceImpl(materializer: Materializer)
extends CheckDigitService {
class CheckDigitServiceImpl(materializer: Materializer) extends CheckDigitService {

override def validateUli(in: ValidUliRequest): Future[ValidUliResponse] = {
val uli = in.uli
val uli = in.uli
val isValid = validateULI(uli)
Future.successful(ValidUliResponse(isValid))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ message VerifyMacroMessage {
message SignSubmissionMessage {
SubmissionIdMessage submissionId = 1;
string replyTo = 2;
string email = 3;
}

4 changes: 4 additions & 0 deletions common/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ hmda {
disclosureTopic = ${?KAFKA_DISCLOSURE_TOPIC}
adTopic = "hmda-spark-ad"
adTopic = ${?KAFKA_AD_TOPIC}
emailTopic = "hmda-email"
emailTopic = ${?KAFKA_AD_TOPIC}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA_EMAIL_TOPIC?

}
groups {
emailGroup = "email-group"
emailGroup = ${?KAFKA_EMAIL_GROUP}
modifiedLarGroup = "modified-lar-group"
modifiedLarGroup = ${?KAFKA_MODIFIED_LAR_GROUP}
analyticsGroup = "analytics-group"
Expand Down
6 changes: 2 additions & 4 deletions common/src/main/scala/hmda/actor/HmdaActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import hmda.messages.CommonMessages.StopActor

abstract class HmdaActor extends Actor with ActorLogging {

override def preStart(): Unit = {
override def preStart(): Unit =
log.debug(s"Actor started at ${self.path}")
}

override def postStop(): Unit = {
override def postStop(): Unit =
log.debug(s"Actor stopped at ${self.path}")
}

override def receive: Receive = {
case StopActor => context stop self
Expand Down
16 changes: 5 additions & 11 deletions common/src/main/scala/hmda/actor/HmdaTypedActor.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package hmda.actor

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.{
ClusterSharding,
Entity,
EntityTypeKey
}
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey }
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._
Expand All @@ -23,10 +19,10 @@ trait HmdaTypedActor[A] {
def behavior(entityId: String): Behavior[A]

protected def supervisedBehavior(entityId: String): Behavior[A] = {
val config = ConfigFactory.load()
val config = ConfigFactory.load()
val minBackOff = config.getInt("hmda.supervisor.minBackOff")
val maxBackOff = config.getInt("hmda.supervisor.maxBackOff")
val rFactor = config.getDouble("hmda.supervisor.randomFactor")
val rFactor = config.getDouble("hmda.supervisor.randomFactor")

val supervisorStrategy = SupervisorStrategy.restartWithBackoff(
minBackoff = minBackOff.seconds,
Expand All @@ -39,13 +35,11 @@ trait HmdaTypedActor[A] {
.onFailure(supervisorStrategy)
}

def startShardRegion(sharding: ClusterSharding)(
implicit tag: ClassTag[A]): ActorRef[ShardingEnvelope[A]] = {
def startShardRegion(sharding: ClusterSharding)(implicit tag: ClassTag[A]): ActorRef[ShardingEnvelope[A]] =
sharding.init(
Entity(
typeKey = typeKey,
createBehavior = ctx => supervisedBehavior(ctx.entityId)
)
)
}
}
4 changes: 2 additions & 2 deletions common/src/main/scala/hmda/api/HmdaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package hmda.api

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorSystem, Status}
import akka.actor.{ Actor, ActorSystem, Status }
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.stream.ActorMaterializer
import hmda.actor.HmdaActor

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ ExecutionContext, Future }

trait HmdaServer extends HmdaActor {
val name: String
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/scala/hmda/api/grpc/GrpcServer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package hmda.api.grpc

import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import hmda.api.HmdaServer

import scala.concurrent.Future
Expand Down
37 changes: 15 additions & 22 deletions common/src/main/scala/hmda/api/http/CensusRecordsRetriever.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ package hmda.publication.lar.services

import akka.NotUsed
import akka.http.scaladsl.HttpExt
import akka.http.scaladsl.common.{
EntityStreamingSupport,
JsonEntityStreamingSupport
}
import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport }
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.scaladsl.{ Keep, Sink, Source }
import hmda.model.census.Census
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ ExecutionContext, Future }
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import hmda.query.dtos.IndexedCensusEntry

sealed trait CensusType
case object Tract extends CensusType
case object County extends CensusType
case object Tract extends CensusType
case object County extends CensusType
case object SmallCounty extends CensusType

object CensusType {
Expand All @@ -30,36 +27,32 @@ object CensusType {
}
}

case class IndexedCensusMaps(tract: Map[String, Census],
county: Map[String, Census],
smallCounty: Map[String, Census])
case class IndexedCensusMaps(tract: Map[String, Census], county: Map[String, Census], smallCounty: Map[String, Census])

class CensusRecordsRetriever(httpClient: HttpExt, censusApiUrl: String) {
private implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
EntityStreamingSupport
.json()
.withParallelMarshalling(parallelism = 4, unordered = true)

def downloadCensusMap(dataType: CensusType)(
implicit mat: ActorMaterializer,
ec: ExecutionContext): Future[Map[String, Census]] = {
def downloadCensusMap(dataType: CensusType)(implicit mat: ActorMaterializer, ec: ExecutionContext): Future[Map[String, Census]] = {
println(censusApiUrl + s"/streaming/${dataType.render}")
httpClient
.singleRequest(
HttpRequest(uri = censusApiUrl + s"/streaming/${dataType.render}"))
.singleRequest(HttpRequest(uri = censusApiUrl + s"/streaming/${dataType.render}"))
.flatMap { response =>
val responseWithoutSizeLimit =
response.copy(entity = response.entity.withoutSizeLimit())
Unmarshal(responseWithoutSizeLimit)
.to[Source[IndexedCensusEntry, NotUsed]]
}
.flatMap(indexedCensusStream =>
indexedCensusStream
.toMat(Sink.fold(Map.empty[String, Census]) {
(acc, next: IndexedCensusEntry) =>
.flatMap(
indexedCensusStream =>
indexedCensusStream
.toMat(Sink.fold(Map.empty[String, Census]) { (acc, next: IndexedCensusEntry) =>
acc + (next.index -> next.data)
})(Keep.right)
.run())
})(Keep.right)
.run()
)
}

}
Expand Down
Loading