Skip to content

Commit

Permalink
Compile warnings have been fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
nuald committed Dec 10, 2020
1 parent 15aec5e commit 07da30c
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 52 deletions.
2 changes: 0 additions & 2 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import akka.actor.ActorSystem
import akka.event.{LogSource, Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import analyzer.{Analyzer, Endpoint, HistoryWriter, Trainer}
import com.datastax.oss.driver.api.core.CqlSession
import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -88,7 +87,6 @@ object Main extends App {
}

implicit val system: ActorSystem = ActorSystem("cluster", akkaConfig)
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = system.dispatcher

// Setup logging
Expand Down
9 changes: 3 additions & 6 deletions src/main/scala/analyzer/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.cluster.{Cluster, Member, MemberStatus}
import akka.cluster.ClusterEvent.{CurrentClusterState, MemberUp}
import akka.event.LoggingAdapter
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.util.Timeout
import lib.{BinarySerializer, CassandraClient, Config, Entry}
import redis.RedisClient
Expand All @@ -31,9 +30,8 @@ final case class SensorMeta(
final case class AllMeta(entries: List[SensorMeta])

object Analyzer {
def props(cassandraClient: CassandraClient, redisClient: RedisClient)
(implicit materializer: ActorMaterializer) =
Props(classOf[Analyzer], cassandraClient, redisClient, materializer)
def props(cassandraClient: CassandraClient, redisClient: RedisClient) =
Props(classOf[Analyzer], cassandraClient, redisClient)

// ANCHOR: withHeuristic begin

Expand All @@ -49,7 +47,7 @@ object Analyzer {
val avg = history.sum / size

def sqrDiff(x: Double) = (x - avg) * (x - avg)
val stdDev = math.sqrt((0.0 /: history)(_ + sqrDiff(_)) / size)
val stdDev = math.sqrt(history.foldLeft(0.0)(_ + sqrDiff(_)) / size)

val valueDev = math.abs(value - avg)
val anomaly = (valueDev - stdDev) / (2 * stdDev)
Expand Down Expand Up @@ -86,7 +84,6 @@ object Analyzer {
}

class Analyzer(cassandraClient: CassandraClient, redisClient: RedisClient)
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {

private[this] val akkaCluster = Cluster(context.system)
Expand Down
7 changes: 2 additions & 5 deletions src/main/scala/analyzer/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import lib._

Expand All @@ -14,15 +13,13 @@ import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

object Endpoint {
def props(analyzerOpt: Option[ActorRef])
(implicit materializer: ActorMaterializer) =
Props(classOf[Endpoint], analyzerOpt, materializer)
def props(analyzerOpt: Option[ActorRef]) =
Props(classOf[Endpoint], analyzerOpt)

final case object Stats
}

class Endpoint(analyzerOpt: Option[ActorRef])
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import Endpoint._

Expand Down
15 changes: 7 additions & 8 deletions src/main/scala/analyzer/HistoryWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package analyzer
import akka.actor._
import akka.pattern.ask
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.querybuilder.QueryBuilder._
Expand All @@ -18,15 +17,13 @@ import scala.concurrent.duration._
import scala.util.{Failure, Success}

object HistoryWriter {
def props(session: CqlSession, redisClient: RedisClient, analyzerOpt: Option[ActorRef])
(implicit materializer: ActorMaterializer) =
Props(classOf[HistoryWriter], session, redisClient, analyzerOpt, materializer)
def props(session: CqlSession, redisClient: RedisClient, analyzerOpt: Option[ActorRef]) =
Props(classOf[HistoryWriter], session, redisClient, analyzerOpt)

private final case object Tick
}

class HistoryWriter(session: CqlSession, redisClient: RedisClient, analyzerOpt: Option[ActorRef])
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import HistoryWriter._

Expand Down Expand Up @@ -107,7 +104,9 @@ class HistoryWriter(session: CqlSession, redisClient: RedisClient, analyzerOpt:
}
}

system.scheduler.schedule(0.millis, conf.historyWriter.period.millis) {
self ! Tick
}
system.scheduler.scheduleWithFixedDelay(
Duration.Zero,
conf.historyWriter.period.millis,
self,
Tick)
}
9 changes: 3 additions & 6 deletions src/main/scala/analyzer/Trainer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package analyzer

import akka.actor._
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import akka.util.Timeout
import lib._
import redis.RedisClient
Expand All @@ -13,18 +12,16 @@ import smile.data.formula._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
import collection.JavaConverters._
import scala.jdk.CollectionConverters._

object Trainer {
def props(cassandraClient: CassandraClient, redisClient: RedisClient)
(implicit materializer: ActorMaterializer) =
Props(classOf[Trainer], cassandraClient, redisClient, materializer)
def props(cassandraClient: CassandraClient, redisClient: RedisClient) =
Props(classOf[Trainer], cassandraClient, redisClient)

private final case object Tick
}

class Trainer(cassandraClient: CassandraClient, redisClient: RedisClient)
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import Trainer._

Expand Down
9 changes: 3 additions & 6 deletions src/main/scala/dashboard/Dashboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import analyzer.Endpoint.Stats
import lib._
Expand All @@ -15,15 +14,13 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.sys.process._

object Dashboard {
def props(cassandraClient: CassandraClient, endpoint: ActorRef)
(implicit materializer: ActorMaterializer) =
Props(classOf[Dashboard], cassandraClient, endpoint, materializer)
def props(cassandraClient: CassandraClient, endpoint: ActorRef) =
Props(classOf[Dashboard], cassandraClient, endpoint)

final case class Perf(timings: List[Double], actorStats: Map[String, Double])
}

class Dashboard(cassandraClient: CassandraClient, endpoint: ActorRef)
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import Dashboard._

Expand Down Expand Up @@ -96,7 +93,7 @@ class Dashboard(cassandraClient: CassandraClient, endpoint: ActorRef)
log.info(s"Querying $url")
Process(runCmd).!
// Second run, for stats
val stream = csvCmd lineStream_! ProcessLogger(_ => ())
val stream = csvCmd lazyLines_! ProcessLogger(_ => ())
val values = stream.flatMap { (line) => line match {
case CsvPattern(responseTime, dnsLookup, dns, requestWrite, responseDelay, responseRead) =>
Some(responseTime.toDouble * 1000)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/lib/CassandraClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet
import com.datastax.oss.driver.api.querybuilder.QueryBuilder._
import com.datastax.oss.driver.api.querybuilder.relation._

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

final case class Entry(sensor: String, ts: java.time.Instant, value: Double, anomaly: Int)

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/lib/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package lib
import com.typesafe.config.{ConfigBeanFactory, ConfigFactory}

import scala.beans.BeanProperty
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

class MqttConfig {
@BeanProperty var broker = ""
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/lib/HttpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives._
import akka.http.scaladsl.server.Directives.{path, _}
import akka.stream.ActorMaterializer

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand All @@ -29,7 +28,6 @@ class HttpClient(
supervisor: ActorRef
)(implicit
system: ActorSystem,
materializer: ActorMaterializer,
executionContext: ExecutionContext
) {
implicit val timeout: Timeout = Timeout(1.seconds)
Expand All @@ -50,7 +48,7 @@ class HttpClient(
}
}

Http().bindAndHandle(cdnExtended, address, port).onComplete {
Http().newServerAt(address, port).bind(cdnExtended).onComplete {
case Success(binding) => supervisor ! HttpConnected(binding)
case Failure(ex) => supervisor ! HttpConnectionFailure(ex)
}
Expand Down
7 changes: 2 additions & 5 deletions src/main/scala/mqtt/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mqtt

import akka.actor._
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import org.eclipse.paho.client.mqttv3._
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.querybuilder.QueryBuilder._
Expand All @@ -14,15 +13,13 @@ import mqtt.Producer.MqttEntry
import scala.concurrent.ExecutionContext

object Consumer {
def props(mqttClient: MqttClient, session: CqlSession)
(implicit materializer: ActorMaterializer) =
Props(classOf[Consumer], mqttClient, session, materializer)
def props(mqttClient: MqttClient, session: CqlSession) =
Props(classOf[Consumer], mqttClient, session)

final case class Arrived(message: MqttMessage)
}

class Consumer(mqttClient: MqttClient, session: CqlSession)
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import Consumer._

Expand Down
15 changes: 7 additions & 8 deletions src/main/scala/mqtt/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.event.LoggingAdapter
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import org.stringtemplate.v4._
import org.eclipse.paho.client.mqttv3._
import lib._
Expand All @@ -16,9 +15,8 @@ import scala.io.Source
import scala.util.{Failure, Success}

object Producer {
def props(mqttClient: MqttClient)
(implicit materializer: ActorMaterializer) =
Props(classOf[Producer], mqttClient, materializer)
def props(mqttClient: MqttClient) =
Props(classOf[Producer], mqttClient)

final case class MqttEntry(sensor: String, value: Double, anomaly: Int)
final case class SensorModel(name: String, isNormal: Boolean)
Expand All @@ -27,7 +25,6 @@ object Producer {
}

class Producer(mqttClient: MqttClient)
(implicit materializer: ActorMaterializer)
extends Actor with ActorLogging {
import Producer._

Expand Down Expand Up @@ -106,7 +103,9 @@ class Producer(mqttClient: MqttClient)
log.error(s"Failed to establish HTTP connection $ex")
}

system.scheduler.schedule(0.millis, conf.mqtt.timeout.millis) {
self ! Tick
}
system.scheduler.scheduleWithFixedDelay(
Duration.Zero,
conf.mqtt.timeout.millis,
self,
Tick)
}
2 changes: 1 addition & 1 deletion start.sc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ val EnvDir = "target/env"
def isCassandraRunning: Boolean = {
val lsofPattern = raw"""p(\d+)""".r
val lsof = Seq("lsof", "-Fp", "-i", ":9042")
lsof.lineStream_!.map {
lsof.lazyLines_!.map {
case lsofPattern(_) => true
case _ => false
}.exists(x => x)
Expand Down

0 comments on commit 07da30c

Please sign in to comment.