Skip to content

Commit

Permalink
Unavailable servers handling has been added.
Browse files Browse the repository at this point in the history
  • Loading branch information
nuald committed Nov 16, 2017
1 parent c5f43ba commit 7557c53
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 54 deletions.
113 changes: 83 additions & 30 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import akka.actor.ActorSystem
import akka.event.{LogSource, Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import analyzer.{Analyzer, Endpoint, HistoryWriter, Trainer}
import com.datastax.driver.core.{Cluster, HostDistance, PoolingOptions}
import com.datastax.driver.core.Cluster
import com.typesafe.config.ConfigFactory

import scala.concurrent.Await
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._
import dashboard._
import mqtt._
import lib._
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import redis.RedisClient

import scala.io.Source
import scala.util.{Failure, Success}

object Main extends App {
val conf = Config.get
Expand Down Expand Up @@ -43,6 +47,35 @@ object Main extends App {
.action((_, c) => c.copy(noLocalAnalyzer = true)).text("Don't use the local analyzer")
}

def getCassandraCluster(contactPoint: String)
(implicit logger: LoggingAdapter): Option[Cluster] = {
val cluster = Cluster.builder()
.addContactPoint(contactPoint)
.build()
try {
val clusterName = cluster.getMetadata.getClusterName
logger.info(s"Cassandra cluster: $clusterName")
Some(cluster)
} catch {
case e: Throwable =>
logger.error(e, "Cassandra cluster is not available")
None
}
}

def getConnectedMqtt(implicit logger: LoggingAdapter): Option[MqttClient] = {
val mqttClient = new MqttClient(conf.mqtt.broker,
MqttClient.generateClientId, new MemoryPersistence)
try {
mqttClient.connect()
Some(mqttClient)
} catch {
case e: Throwable =>
logger.error(e, "MQTT server is not available")
None
}
}

parser.parse(args, ScoptConfig()) match {
case Some(scoptConfig) =>
val akkaConfig = if (scoptConfig.akkaConfig.nonEmpty) {
Expand All @@ -54,38 +87,58 @@ object Main extends App {

implicit val system: ActorSystem = ActorSystem("cluster", akkaConfig)
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = system.dispatcher

val poolingOptions = new PoolingOptions()
poolingOptions
.setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
.setConnectionsPerHost(HostDistance.REMOTE, 2, 4)
val cluster = Cluster.builder()
.addContactPoint(scoptConfig.cassandraHost)
.withPoolingOptions(poolingOptions)
.build()
val cassandraActor = system.actorOf(CassandraActor.props(cluster), "cassandra-client")

val redisClient = RedisClient(scoptConfig.redisHost, conf.redis.port)

val analyzerOpt = if (scoptConfig.noLocalAnalyzer) None else
Some(system.actorOf(Analyzer.props(cassandraActor, redisClient), "analyzer"))

if (scoptConfig.isServer) {
system.actorOf(Producer.props(), "producer")
system.actorOf(Consumer.props(cluster), "consumer")

val endpoint = system.actorOf(Endpoint.props(analyzerOpt), "endpoint")
system.actorOf(Trainer.props(cassandraActor, redisClient), "trainer")

system.actorOf(HistoryWriter.props(cluster, redisClient, analyzerOpt), "history-writer")
system.actorOf(Dashboard.props(cassandraActor, endpoint), "dashboard")
// Setup logging
implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
implicit val logger: LoggingAdapter = Logging(system, this)

def getRedisClient(host: String): Option[RedisClient] = {
val redisClient = RedisClient(host, conf.redis.port)
Await.ready(redisClient.ping(), 1.seconds).value.get match {
case Success(_) => Some(redisClient)
case Failure(e) =>
system.stop(redisClient.redisConnection)
logger.error(e, "Redis server is not available")
None
}
}

scala.sys.addShutdownHook {
system.terminate()
Await.result(system.whenTerminated, 5.seconds)
cluster.close()
getCassandraCluster(scoptConfig.cassandraHost) match {
case Some(cluster) =>
val cassandraActor = system.actorOf(CassandraActor.props(cluster), "cassandra-client")

getRedisClient(scoptConfig.redisHost) foreach { redisClient =>
val analyzerOpt = if (scoptConfig.noLocalAnalyzer) None else
Some(system.actorOf(Analyzer.props(cassandraActor, redisClient), "analyzer"))

if (scoptConfig.isServer) {
val endpoint = system.actorOf(Endpoint.props(analyzerOpt), "endpoint")
system.actorOf(Trainer.props(cassandraActor, redisClient), "trainer")

system.actorOf(HistoryWriter.props(cluster, redisClient, analyzerOpt), "history-writer")
system.actorOf(Dashboard.props(cassandraActor, endpoint), "dashboard")
}
}

if (scoptConfig.isServer) {
getConnectedMqtt foreach { mqttClient =>
system.actorOf(Producer.props(mqttClient), "producer")
system.actorOf(Consumer.props(mqttClient, cluster), "consumer")
}
}

scala.sys.addShutdownHook {
system.terminate()
Await.result(system.whenTerminated, 5.seconds)
cluster.close()
}
case None => system.terminate()
}

case None =>
// arguments are bad, error message will have been displayed
}
Expand Down
19 changes: 6 additions & 13 deletions src/main/scala/mqtt/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import akka.actor._
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.querybuilder.QueryBuilder
import lib._
Expand All @@ -13,14 +12,14 @@ import mqtt.Producer.MqttEntry
import scala.concurrent.ExecutionContext

object Consumer {
def props(cluster: Cluster)
def props(mqttClient: MqttClient, cluster: Cluster)
(implicit materializer: ActorMaterializer) =
Props(classOf[Consumer], cluster, materializer)
Props(classOf[Consumer], mqttClient, cluster, materializer)

final case class Arrived(message: MqttMessage)
}

class Consumer(cluster: Cluster)
class Consumer(mqttClient: MqttClient, cluster: Cluster)
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import Consumer._
Expand All @@ -32,15 +31,9 @@ class Consumer(cluster: Cluster)
private val conf = Config.get
private val session = cluster.connect(conf.cassandra.keyspace)

val client = new MqttClient(
conf.mqtt.broker,
MqttClient.generateClientId,
new MemoryPersistence
)
client.connect()
client.subscribe(conf.mqtt.topic)
mqttClient.subscribe(conf.mqtt.topic)

client.setCallback(new MqttCallback {
mqttClient.setCallback(new MqttCallback {
override def messageArrived(topic: String, message: MqttMessage): Unit = {
self ! Arrived(message)
}
Expand All @@ -54,7 +47,7 @@ class Consumer(cluster: Cluster)
})

override def postStop(): Unit = {
client.disconnect()
mqttClient.disconnect()
session.close()
}

Expand Down
18 changes: 7 additions & 11 deletions src/main/scala/mqtt/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import org.clapper.scalasti.ST
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import lib._

import scala.collection.JavaConverters._
Expand All @@ -18,16 +17,18 @@ import scala.io.Source
import scala.util.{Failure, Success}

object Producer {
def props()(implicit materializer: ActorMaterializer) =
Props(classOf[Producer], materializer)
def props(mqttClient: MqttClient)
(implicit materializer: ActorMaterializer) =
Props(classOf[Producer], mqttClient, materializer)

final case class MqttEntry(sensor: String, value: Double, anomaly: Int)
final case class SensorModel(name: String, isNormal: Boolean)

private final case object Tick
}

class Producer()(implicit materializer: ActorMaterializer)
class Producer(mqttClient: MqttClient)
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import Producer._

Expand All @@ -36,12 +37,7 @@ class Producer()(implicit materializer: ActorMaterializer)
implicit val logger: LoggingAdapter = log

private val conf = Config.get
val client = new MqttClient(conf.mqtt.broker,
MqttClient.generateClientId,
new MemoryPersistence
)
client.connect()
private val msgTopic = client.getTopic(conf.mqtt.topic)
private val msgTopic = mqttClient.getTopic(conf.mqtt.topic)

private val sensors = conf.mqtt.sensors.asScala
private var state = sensors.map(k => (k, "normal")).toMap
Expand Down Expand Up @@ -82,7 +78,7 @@ class Producer()(implicit materializer: ActorMaterializer)
case Some(x) => x.unbind
case None =>
}
client.disconnect()
mqttClient.disconnect()
}

override def receive: Receive = {
Expand Down

0 comments on commit 7557c53

Please sign in to comment.