Skip to content

Commit

Permalink
Documentation has been fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
nuald committed Dec 11, 2020
1 parent ff30eda commit ca2ae58
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 24 deletions.
49 changes: 35 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,17 @@ Dump the entries into the CSV file:
Read the CSV file and extract the features and the labels for the particular sensor:

```scala
import smile.data._
import smile.data.formula._
import smile.data.`type`._

import scala.jdk.CollectionConverters._

// Declare the class to get better visibility on the data
case class Row(sensor: String, ts: String, value: Double, anomaly: Int)

// Read the values from the CSV file
val iter = scala.io.Source.fromFile("list.csv").getLines
val iter = scala.io.Source.fromFile("list.csv").getLines()

// Get the data
val l = iter.map(_.split(",") match {
Expand All @@ -179,11 +185,20 @@ val l = iter.map(_.split(",") match {
// Get the sensor name for further analysis
val name = l.head.sensor

// Features are multi-dimensional, labels are integers
val mapping = (x: Row) => (Array(x.value), x.anomaly)

// Extract the features and the labels for the given sensor
val (features, labels) = l.filter(_.sensor == name).map(mapping).unzip
// Prepare data frame for the given sensor
val data = DataFrame.of(
l.filter(_.sensor == name)
.map(row => Tuple.of(
Array(
row.value.asInstanceOf[AnyRef],
row.anomaly.asInstanceOf[AnyRef]),
DataTypes.struct(
new StructField("value", DataTypes.DoubleType),
new StructField("anomaly", DataTypes.IntegerType))))
.asJava)

// Declare formula for the features and the labels
val formula = "anomaly" ~ "value"

```

Expand All @@ -193,7 +208,8 @@ Fast analysis (labels are ignored because we don't use any training here):

```scala
// Get the first 200 values
val values = features.flatten.take(200)
val values = data.stream()
.limit(200).map(_.getDouble(0)).iterator().asScala.toArray

// Use the fast analyzer for the sample values
val samples = Seq(10, 200, -100)
Expand All @@ -214,10 +230,10 @@ import scala.sys.process._
import smile.classification.randomForest

// Fit the model
val rf = randomForest(features.toArray, labels.toArray)
val rf = randomForest(formula, data)

// Get the dot diagram for a sample tree
val desc = rf.getTrees()(0).dot
val desc = rf.trees()(0).dot

// View the diagram (macOS example)
s"echo $desc" #| "dot -Tpng" #| "open -a Preview -f" !
Expand Down Expand Up @@ -251,11 +267,16 @@ val futureRf = using(new ObjectInputStream(new FileInputStream("target/rf.bin"))
val rf = futureRf.get

// Use the loaded model for the sample values
val samples = Seq(10, 200, -100)
val samples = Seq(10.0, 200.0, -100.0)
samples.map { sample =>
val probability = new Array[Double](2)
val prediction = rf.predict(Array(sample), probability)
(prediction, probability)
val posteriori = new Array[Double](2)
val prediction = rf.predict(
Tuple.of(
Array(sample),
DataTypes.struct(
new StructField("value", DataTypes.DoubleType))),
posteriori)
(prediction, posteriori)
}

```
Expand All @@ -278,7 +299,7 @@ Verify the history of detecting anomalies using CQL:
$ cqlsh -e "select * from sandbox.analysis limit 10;"

In most cases it's better to use specialized solutions for clustering,
for example, [Kubernetes](https://developer.lightbend.com/guides/akka-cluster-kubernetes-k8s-deploy/).
for example, [Kubernetes](https://doc.akka.io/docs/akka-management/current/kubernetes-deployment/forming-a-cluster.html).
However, in the sample project the server and clients are configured manually
for the demonstration purposes.

Expand Down
8 changes: 4 additions & 4 deletions resources/akka/cluster.conf
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
akka {
remote {
log-remote-lifecycle-events = off
netty.tcp {
remote.artery {
canonical {
hostname = "<client-host>"
port = <client-port>
}
}

cluster {
seed-nodes = ["akka.tcp://cluster@<server-host>:2551"]
seed-nodes = ["akka://cluster@<server-host>:2551"]
roles = [<role>]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
14 changes: 9 additions & 5 deletions src/main/scala/mqtt/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@ import akka.event.LoggingAdapter
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.http.scaladsl.server.Directives._
import org.stringtemplate.v4._
import org.eclipse.paho.client.mqttv3._
import lib._
import org.eclipse.paho.client.mqttv3._
import org.stringtemplate.v4._

import scala.concurrent.duration._
import scala.beans.BeanProperty
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.io.Source
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}

object Producer {
def props(mqttClient: MqttClient) =
Props(classOf[Producer], mqttClient)

final case class MqttEntry(sensor: String, value: Double, anomaly: Int)
final case class SensorModel(name: String, isNormal: Boolean)
final case class SensorModel(
@BeanProperty name: String,
@BeanProperty isNormal: Boolean)

private final case object Tick
}
Expand Down Expand Up @@ -90,7 +94,7 @@ class Producer(mqttClient: MqttClient)
get {
val src = Source.fromFile("resources/producer/index.html").mkString
val model = sensors.map(name => SensorModel(name, state(name) == "normal"))
val template = new ST(src, '$', '$').add("sensors", model)
val template = new ST(src, '$', '$').add("sensors", model.asJava)
val dst = template.render()
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, dst))
}
Expand Down
5 changes: 4 additions & 1 deletion src/test/scala/lib/EntriesFixture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class EntriesFixture {
// Get the sensor name for further analysis
val name = l.head.sensor

// Prepare data frame for the given sensor
val data = DataFrame.of(
l.filter(_.sensor == name)
.map(row => Tuple.of(
Expand All @@ -37,7 +38,9 @@ class EntriesFixture {
DataTypes.struct(
new StructField("value", DataTypes.DoubleType),
new StructField("anomaly", DataTypes.IntegerType))))
.asJava)
.asJava)

// Declare formula for the features and the labels
val formula = "anomaly" ~ "value"
(data, formula)
}
Expand Down

0 comments on commit ca2ae58

Please sign in to comment.