diff --git a/server/exercise/build.sbt b/server/exercise/build.sbt index 7c82502a..ead2248a 100644 --- a/server/exercise/build.sbt +++ b/server/exercise/build.sbt @@ -33,5 +33,6 @@ libraryDependencies ++= Seq( scalatest % "test", scalacheck % "test", akka.testkit % "test", - spray.testkit % "test" + spray.testkit % "test", + akka.streams.testkit % "test" ) diff --git a/server/exercise/src/main/resources/classification.conf b/server/exercise/src/main/resources/classification.conf index 4505af31..28a64393 100644 --- a/server/exercise/src/main/resources/classification.conf +++ b/server/exercise/src/main/resources/classification.conf @@ -1,10 +1,12 @@ classification { + frequency = 100 # Hz model.path = "/models" gesture { tap { model = "svm-model-tap-features" - size = 25 + size = 25 # number of sampled events (seconds = size / frequency) + threshold = 0.80 } } } diff --git a/server/exercise/src/main/scala/com/eigengo/lift/exercise/AccelerometerData.scala b/server/exercise/src/main/scala/com/eigengo/lift/exercise/AccelerometerData.scala index 24026689..d138f6eb 100644 --- a/server/exercise/src/main/scala/com/eigengo/lift/exercise/AccelerometerData.scala +++ b/server/exercise/src/main/scala/com/eigengo/lift/exercise/AccelerometerData.scala @@ -1,10 +1,5 @@ package com.eigengo.lift.exercise -import scodec.bits.{BitVector, ByteOrdering} - -import scala.annotation.tailrec -import scalaz.\/ - /** * Accelerometer data groups ``values`` at the given ``samplingRate`` * @param samplingRate the sampling rate in Hz diff --git a/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/svm/SVMModelParser.scala b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/svm/SVMModelParser.scala index 92b53856..798942b9 100644 --- a/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/svm/SVMModelParser.scala +++ b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/svm/SVMModelParser.scala @@ -36,7 +36,7 @@ private[svm] trait ParserUtils extends StringBuilding { def FractionalPart = rule { optional('.' ~ oneOrMore(Digit)) ~ optional(ignoreCase('e') ~ optional(anyOf("+-")) ~ oneOrMore(Digit)) } - + def Number: Rule1[String] = rule { capture(optional(anyOf("+-")) ~ oneOrMore(Digit) ~ optional(FractionalPart)) ~> ((n: String) => push((Try(n.toInt) orElse Try(n.toDouble)).get.toString)) } diff --git a/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/ClassificationAssertions.scala b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/ClassificationAssertions.scala new file mode 100644 index 00000000..a66bf242 --- /dev/null +++ b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/ClassificationAssertions.scala @@ -0,0 +1,28 @@ +package com.eigengo.lift.exercise.classifiers.workflows + +object ClassificationAssertions { + + /** + * Facts that may hold of sensor data + */ + sealed trait Fact + case class Gesture(name: String, matchProbability: Double) extends Fact + case object Unknown extends Fact + + /** + * Quantifier-free assertions that may hold of sensor data + */ + sealed trait Assertion + case class Predicate(fact: Fact) extends Assertion + case class Conjunction(assert1: Assertion, assert2: Assertion, assertRemaining: Assertion*) extends Assertion + case class Disjunction(assert1: Assertion, assert2: Assertion, assertRemaining: Assertion*) extends Assertion + + /** + * Bind an assertion to a sensor data value. In doing this, assertion is true for that value. + * + * @param assertion assertion that is true for the sensor data value + * @param value sensor data that assertion holds for + */ + case class Bind[A](assertion: Assertion, value: A) + +} diff --git a/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/GestureWorkflows.scala b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/GestureWorkflows.scala new file mode 100644 index 00000000..fbd0f31c --- /dev/null +++ b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/GestureWorkflows.scala @@ -0,0 +1,227 @@ +package com.eigengo.lift.exercise.classifiers.workflows + +import akka.stream.scaladsl._ +import breeze.linalg.DenseMatrix +import com.eigengo.lift.exercise.AccelerometerValue +import com.eigengo.lift.exercise.classifiers.svm.{SVMClassifier, SVMModelParser} +import com.typesafe.config.Config + +/** + * Trait that implements reactive stream components that can: + * + * * [identifyGestureEvents] tap into sensor streams and trigger transformation events whenever a sample window is classified as a gesture + * * [mergeTransformations] merge collections of transformation events into a single transformation event + * * [modulateSensorNet] modulate the signals (e.g. by tagging them) in a network of sensors using a transformation signal + * * [gestureCollector] split tagged sensor streams using highest probability gesture matches + */ +// TODO: calling code needs to "normalise" sensor stream against the time dimension - i.e. here we assume that events occur with a known frequency (use `TickSource` as a driver for this?) +trait GestureWorkflows extends SVMClassifier { + + import ClassificationAssertions._ + import FlowGraphImplicits._ + + def name: String + def config: Config + + def frequency = { + val value = config.getInt("classification.frequency") + assert(value > 0) + value + } + def threshold = { + val value = config.getDouble(s"classification.gesture.$name.threshold") + assert(0 <= value && value <= 1) + value + } + def windowSize = { + val value = config.getInt(s"classification.gesture.$name.size") + assert(value > 0) + value + } + + // NOTE: here we accept throwing an exception in loading R libSVM models (since this indicates a catastrophic configuration error!) + private lazy val model = new SVMModelParser(name)(config).model.get + + /** + * Measures probability that sampled window is recognised as a gesture event. + * + * @param sample sampled data to be tested + * @return probability that a gesture was recognised in the sample window + */ + private def probabilityOfGestureEvent(sample: List[AccelerometerValue]): Double = { + require(sample.length == windowSize) + + val data = DenseMatrix(sample.map(v => (v.x.toDouble, v.y.toDouble, v.z.toDouble)): _*) + + predict(model, data, taylor_radial_kernel()).positiveMatch + } + + /** + * Flowgraph that taps the in-out stream and, if a gesture is recognised, sends a `Fact` message to the `tap` sink. + */ + class IdentifyGestureEvents { + val in = UndefinedSource[AccelerometerValue] + val out = UndefinedSink[AccelerometerValue] + val tap = UndefinedSink[Fact] + + val graph = PartialFlowGraph { implicit builder => + val split = Broadcast[AccelerometerValue] + + in ~> split ~> out + split ~> Flow[AccelerometerValue].transform(() => SlidingWindow[AccelerometerValue](windowSize)).map { (sample: List[AccelerometerValue]) => + if (sample.length == windowSize) { + // Saturated windows may be classified + val matchProbability = probabilityOfGestureEvent(sample) + + if (matchProbability > threshold) { + Gesture(name, matchProbability) + } else { + Unknown + } + } else { + // Truncated windows are never classified (these typically occur when the stream closes) + Unknown + } + } ~> tap + } + } + + object IdentifyGestureEvents { + def apply() = new IdentifyGestureEvents() + } + + /** + * Flowgraph that merges (via a user supplied function) a collection of input sources into a single output sink. + * + * @param size number of input sources that we are to bundle and merge + */ + class MergeSignals[A, B](size: Int, merge: Set[A] => B) { + require(size > 0) + + val in = (0 until size).map(_ => UndefinedSource[A]) + val out = UndefinedSink[B] + + val graph = PartialFlowGraph { implicit builder => + // We separate out size 1 case since `ZipN` nodes need at least 2 upstream nodes + if (size == 1) { + in.head ~> Flow[A].map(v => merge(Set(v))) ~> out + } else { + val zip = ZipN[A](in.size) + + for ((probe, index) <- in.zipWithIndex) { + probe ~> zip.in(index) + } + zip.out ~> Flow[Set[A]].map(merge) ~> out + } + } + } + + object MergeSignals { + def apply[A, B](size: Int)(merge: Set[A] => B) = { + new MergeSignals[A, B](size, merge) + } + } + + /** + * Flowgraph that modulates all sensors in a network of location tagged sensors. Messages on the `transform` source + * determine how signals in the sensor net are modulated or transformed. + * + * @param locations set of locations that make up the sensor network + */ + class ModulateSensorNet[A, B, L](locations: Set[L]) { + require(locations.nonEmpty) + + val in = locations.map(loc => (loc, UndefinedSource[A])).toMap + val transform = UndefinedSource[Fact] + val out = locations.map(loc => (loc, UndefinedSink[Bind[A]])).toMap + + val graph = PartialFlowGraph { implicit builder => + // We separate out 1-element case since `Broadcast` nodes need at least 2 downstream nodes + if (locations.size == 1) { + val zip = ZipWith[A, Fact, Bind[A]]((msg: A, tag: Fact) => tag match { + case Gesture(name, matchProb) => + Bind(Predicate(Gesture(name, matchProb)), msg) + + case Unknown => + Bind(Predicate(Unknown), msg) + }) + + in(locations.head) ~> zip.left + transform ~> zip.right + zip.out ~> out(locations.head) + } else { + val broadcast = Broadcast[Fact] + + transform ~> broadcast + for ((location, sensor) <- in) { + val zip = ZipWith[A, Fact, Bind[A]]((msg: A, tag: Fact) => tag match { + case Gesture(name, matchProb) => + Bind(Predicate(Gesture(name, matchProb)), msg) + + case Unknown => + Bind(Predicate(Unknown), msg) + }) + + sensor ~> zip.left + broadcast ~> zip.right + zip.out ~> out(location) + } + } + } + } + + object ModulateSensorNet { + def apply[A, B, L](locations: Set[L]) = new ModulateSensorNet[A, B, L](locations) + } + + /** + * Flowgraph that monitors the `inputLocations` sensor network for recognisable gestures. When gestures are detected, + * messages on the `outputLocations` sensor network are tagged and grouped. + * + * @param inputLocations locations that make up the input sensor network + * @param outputLocations locations that make up the output sensor network + */ + class GestureClassification[L](inputLocations: Set[L], outputLocations: Set[L]) { + require(inputLocations.nonEmpty) + require(outputLocations.nonEmpty) + + private val identify = inputLocations.map(loc => (loc, IdentifyGestureEvents())).toMap + private val modulate = ModulateSensorNet[AccelerometerValue, Bind[AccelerometerValue], L](outputLocations) + private val merge = MergeSignals[Fact, Fact](inputLocations.size) { (obs: Set[Fact]) => + require(obs.nonEmpty) + + if (obs.filter(_.isInstanceOf[Gesture]).asInstanceOf[Set[Gesture]].filter(_.name == name).nonEmpty) { + obs.filter(_.isInstanceOf[Gesture]).asInstanceOf[Set[Gesture]].filter(_.name == name).maxBy(_.matchProbability) + } else { + obs.head + } + } + + // Tapped sensors - monitored for recognisable gestures + val inputTap = inputLocations.map(loc => (loc, identify(loc).in)).toMap + val outputTap = inputLocations.map(loc => (loc, identify(loc).out)).toMap + + // Modulation sensors - binds sensor data with gesture facts + val inputModulate = outputLocations.map(loc => (loc, modulate.in(loc))).toMap + val outputModulate = outputLocations.map(loc => (loc, modulate.out(loc))).toMap + + val graph = PartialFlowGraph { implicit builder => + builder.importPartialFlowGraph(merge.graph) + + // Wire in tapped sensors + for ((loc, index) <- inputLocations.zipWithIndex) { + builder.importPartialFlowGraph(identify(loc).graph) + builder.connect(identify(loc).tap, Flow[Fact], merge.in(index)) + } + + // Wire in modulation + builder.importPartialFlowGraph(modulate.graph) + builder.connect(merge.out, Flow[Fact], modulate.transform) + } + } + + object GestureClassification { + def apply[L](inputLocations: Set[L], outputLocations: Set[L]) = new GestureClassification[L](inputLocations, outputLocations) + } + +} diff --git a/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindow.scala b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindow.scala new file mode 100644 index 00000000..e7d107a5 --- /dev/null +++ b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindow.scala @@ -0,0 +1,80 @@ +package com.eigengo.lift.exercise.classifiers.workflows + +import akka.stream.stage.{TerminationDirective, Directive, Context, PushPullStage} +import scala.collection.mutable + +/** + * Streaming stage that buffers events and slides a window over streaming input data. Transmits each observed window + * downstream. + * + * @param size size of the internal buffer and so the sliding window size + */ +class SlidingWindow[A] private (size: Int) extends PushPullStage[A, List[A]] { + require(size > 0) + + private val buffer = mutable.Queue[A]() + private var isSaturated = false + + override def onPush(elem: A, ctx: Context[List[A]]): Directive = { + if (buffer.length == size) { + // Buffer is full, so push new window + buffer.dequeue() + buffer.enqueue(elem) + ctx.push(buffer.toList) + } else { + // Buffer is not yet full, so keep consuming from our upstream + buffer.enqueue(elem) + if (buffer.length == size) { + // Buffer has become full, so push new window and record saturation + isSaturated = true + ctx.push(buffer.toList) + } else { + ctx.pull() + } + } + } + + override def onPull(ctx: Context[List[A]]): Directive = { + if (ctx.isFinishing) { + // Streaming stage is shutting down, so we ensure that all buffer elements are flushed prior to finishing + if (buffer.isEmpty) { + // Buffer is empty, so we simply finish + ctx.finish() + } else if (buffer.length == 1) { + // Buffer is non-empty, so empty it by sending undersized (non-empty) truncated window sequence and finish + if (isSaturated) { + // Buffer was previously saturated, so head element has already been seen + buffer.dequeue() + ctx.finish() + } else { + // Buffer was never saturated, so head element needs to be pushed + ctx.pushAndFinish(List(buffer.dequeue())) + } + } else { + // Buffer is non-empty, so empty it by sending undersized (non-empty) truncated window sequence - we will eventually finish here + if (isSaturated) { + // Buffer was previously saturated, so head element has already been seen + buffer.dequeue() + ctx.push(buffer.toList) + } else { + // Buffer was never saturated, so head element should be part of truncated window + val window = buffer.toList + buffer.dequeue() + ctx.push(window) + } + } + } else { + ctx.pull() + } + } + + override def onUpstreamFinish(ctx: Context[List[A]]): TerminationDirective = { + ctx.absorbTermination() + } +} + +object SlidingWindow { + def apply[A](size: Int): SlidingWindow[A] = { + new SlidingWindow(size) + } +} diff --git a/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/ZipNodes.scala b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/ZipNodes.scala new file mode 100644 index 00000000..475a39e3 --- /dev/null +++ b/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/ZipNodes.scala @@ -0,0 +1,57 @@ +package com.eigengo.lift.exercise.classifiers.workflows + +import akka.stream.scaladsl.FlexiMerge + +/** +* Flowgraph merge node that expects a message on each of its inputs. The collection of input messages is then outputted. +* This merge node can be thought of as a generalised Zip or ZipWith node. +* +* @param size number of inputs that may be joined to this merge node +*/ +abstract class ZipBundle[A] private[workflows] (size: Int) extends FlexiMerge[Set[A]] { + require(size > 1, s"ZipBundle must have at least 2 connected inputs ($size were given)") + + import FlexiMerge._ + + protected val inPorts = (0 until size).map { _ => createInputPort[A]() }.toVector + + def createMergeLogic() = new MergeLogic[Set[A]] { + def initialState = State[ReadAllInputs](ReadAll(inPorts)) { (ctx, _, inputs) => + ctx.emit(inPorts.flatMap(port => inputs.get[A](port)).toSet) + + SameState[A] + } + + def inputHandles(inputCount: Int) = { + require(inputCount == size, s"ZipBundle must have $size connected inputs, was $inputCount") + + inPorts + } + + override def initialCompletionHandling = eagerClose + } + +} + +/** + * Utility classes and factories for using ZipBundle. They differ in how we reference the input ports. + * + * ZipN - input ports are referenced using indexes from 0 until size + * ZipSet - input ports are referenced using a set of locations or addresses + */ + +class ZipN[A] private[workflows] (size: Int) extends ZipBundle[A](size) { + val in = inPorts +} + +object ZipN { + def apply[A](size: Int) = new ZipN[A](size) +} + +class ZipSet[A, L] private[workflows] (keys: Set[L]) extends ZipBundle[A](keys.size) { + val in = keys.zipWithIndex.map { case (key, index) => (key, inPorts(index)) }.toMap +} + +object ZipSet { + def apply[A, L](keys: Set[L]) = new ZipSet[A, L](keys) +} diff --git a/server/exercise/src/test/resources/samples/tap.csv b/server/exercise/src/test/resources/samples/tap.csv new file mode 100644 index 00000000..fdc37a1a --- /dev/null +++ b/server/exercise/src/test/resources/samples/tap.csv @@ -0,0 +1,744 @@ +32,48,-968 +32,72,-1040 +32,48,-1016 +32,72,-1032 +56,72,-992 +40,80,-1000 +32,72,-1008 +24,56,-1008 +16,56,-992 +8,56,-1008 +8,48,-992 +-8,48,-1016 +8,40,-1016 +24,40,-1008 +32,40,-1016 +32,56,-1008 +32,56,-1016 +24,72,-984 +24,80,-1000 +24,88,-976 +32,88,-1032 +32,80,-1040 +40,80,-1040 +40,72,-1032 +40,56,-1024 +40,48,-1000 +32,48,-1016 +24,40,-1000 +24,40,-992 +24,32,-1008 +16,32,-1024 +16,40,-1040 +32,48,-1016 +40,56,-1032 +64,64,-1016 +48,104,-1024 +64,88,-1032 +64,96,-1032 +40,80,-1064 +40,56,-1048 +56,48,-1016 +40,32,-1032 +32,40,-1008 +24,40,-1024 +40,16,-1064 +32,-8,-1048 +16,-24,-1040 +8,-40,-1016 +-8,-24,-1008 +-8,-24,-992 +-8,-16,-992 +-16,-16,-1008 +-16,-16,-1000 +-16,-8,-1000 +-16,-8,-992 +-24,8,-976 +-16,8,-976 +-16,8,-984 +-16,8,-968 +-16,24,-968 +-16,24,-960 +-24,24,-968 +-24,32,-976 +-24,32,-992 +-32,40,-992 +-32,40,-984 +-24,40,-992 +-24,40,-984 +-24,40,-992 +-32,32,-984 +-40,16,-984 +-40,0,-1000 +-40,-8,-992 +-40,-16,-992 +-32,0,-1008 +-32,8,-1016 +-32,16,-1008 +-24,16,-1000 +-32,-8,-1008 +-32,-8,-1000 +-40,-8,-1000 +-48,8,-1000 +-48,24,-992 +-40,32,-984 +-48,16,-976 +-56,24,-976 +-72,32,-976 +-48,24,-976 +-16,0,-1008 +-24,16,-984 +-24,40,-992 +-56,72,-1008 +-40,64,-1000 +-32,56,-1016 +-32,24,-968 +-48,32,-992 +-56,40,-984 +-48,24,-1000 +-40,16,-1032 +-48,32,-992 +-40,40,-976 +-40,48,-984 +-80,48,-984 +-72,32,-1000 +-56,32,-1000 +-56,24,-992 +-48,24,-976 +-72,40,-968 +-72,40,-960 +-72,40,-976 +-72,48,-952 +-72,40,-968 +-56,40,-968 +-56,32,-984 +-64,24,-960 +-72,40,-960 +-72,24,-952 +-56,24,-888 +-64,40,-880 +-56,32,-928 +-40,40,-968 +-56,80,-984 +-64,120,-992 +-64,104,-1008 +-48,80,-992 +-32,80,-952 +-32,72,-952 +-48,88,-976 +-24,80,-1016 +-16,64,-1008 +-40,48,-1016 +-24,48,-1064 +-8,16,-1056 +8,40,-1064 +8,56,-1048 +0,48,-1048 +0,56,-1056 +0,88,-1056 +0,80,-1072 +16,48,-1016 +8,32,-1072 +16,48,-1088 +16,96,-1152 +-8,104,-1088 +0,96,-1104 +-16,72,-1072 +-16,72,-1064 +-24,80,-1056 +-8,80,-1040 +8,80,-1080 +24,80,-1032 +56,64,-1056 +88,64,-1016 +120,80,-1024 +112,80,-1024 +112,96,-1024 +112,104,-1016 +128,144,-976 +128,160,-1008 +136,136,-1024 +168,152,-1080 +208,160,-1080 +224,160,-1088 +240,152,-1040 +224,136,-1080 +232,128,-1072 +224,88,-1024 +216,88,-992 +232,112,-984 +256,136,-936 +272,160,-976 +272,168,-976 +264,152,-944 +248,136,-960 +240,120,-936 +232,112,-880 +240,136,-944 +256,192,-920 +248,272,-936 +216,328,-904 +208,336,-928 +192,288,-904 +160,288,-984 +192,312,-1040 +168,376,-936 +152,416,-960 +152,448,-968 +128,480,-968 +128,464,-976 +80,488,-960 +64,504,-936 +40,512,-880 +24,520,-840 +24,536,-776 +-8,544,-808 +0,552,-768 +-16,552,-824 +-32,528,-808 +-64,560,-848 +-56,560,-808 +-80,576,-784 +-104,536,-800 +-120,480,-808 +-128,432,-792 +-112,408,-760 +-96,416,-808 +-72,416,-824 +-32,416,-816 +0,408,-848 +24,352,-848 +32,280,-920 +56,224,-848 +88,240,-808 +88,248,-848 +96,280,-816 +64,280,-824 +40,248,-832 +8,200,-880 +-8,168,-1008 +40,200,-1048 +96,240,-1064 +80,256,-952 +40,224,-960 +8,200,-936 +-32,184,-952 +-56,224,-984 +-56,328,-1032 +-32,424,-1056 +-16,504,-1080 +-16,544,-1064 +-8,512,-1032 +8,480,-912 +16,432,-800 +48,384,-648 +64,376,-576 +112,408,-648 +160,488,-672 +224,544,-704 +264,528,-768 +328,472,-784 +360,392,-840 +384,304,-768 +400,232,-688 +456,144,-640 +520,136,-664 +568,208,-712 +592,352,-760 +600,448,-760 +592,480,-752 +608,376,-768 +648,296,-792 +680,272,-776 +680,312,-760 +656,320,-792 +664,280,-808 +656,232,-736 +680,200,-680 +688,216,-664 +728,264,-608 +728,312,-640 +736,320,-664 +792,288,-672 +824,280,-616 +840,312,-608 +848,376,-560 +832,416,-536 +832,416,-520 +848,344,-496 +864,216,-520 +872,88,-536 +840,-32,-504 +856,-128,-512 +896,-144,-616 +976,-128,-680 +1056,-32,-632 +1088,120,-616 +1120,160,-616 +1112,152,-624 +1112,72,-624 +1096,-32,-648 +1112,-144,-656 +1112,-168,-600 +1104,-192,-600 +1088,-152,-528 +1096,-112,-472 +1120,-96,-472 +1120,-80,-544 +1080,-96,-592 +1024,-120,-536 +968,-136,-496 +912,-152,-472 +952,-120,-416 +976,-72,-384 +992,8,-416 +984,24,-560 +976,24,-648 +960,-8,-712 +944,-24,-744 +944,16,-728 +920,104,-712 +904,192,-760 +872,248,-744 +872,248,-776 +856,240,-784 +840,240,-840 +832,264,-816 +784,240,-848 +768,232,-808 +752,232,-768 +712,240,-720 +704,256,-704 +704,296,-640 +712,328,-624 +728,352,-592 +744,360,-576 +752,328,-576 +768,304,-544 +776,288,-504 +776,272,-544 +784,248,-560 +784,200,-576 +768,152,-544 +784,120,-536 +808,112,-480 +824,128,-416 +840,152,-440 +808,176,-480 +840,168,-488 +840,192,-448 +840,192,-472 +840,192,-472 +856,176,-512 +864,160,-528 +840,112,-520 +848,80,-504 +824,72,-528 +816,96,-496 +816,136,-464 +784,136,-528 +784,136,-432 +792,120,-424 +816,152,-376 +848,176,-448 +872,200,-504 +864,176,-536 +864,152,-536 +880,136,-504 +888,120,-512 +888,128,-408 +904,136,-472 +960,144,-512 +976,160,-552 +992,200,-624 +968,224,-592 +1008,256,-592 +1048,272,-600 +1096,256,-640 +1096,208,-640 +1088,176,-640 +1056,144,-584 +1024,120,-544 +1008,144,-512 +952,160,-488 +920,144,-544 +936,136,-496 +928,112,-448 +904,72,-464 +888,64,-448 +888,80,-424 +896,88,-384 +896,104,-376 +912,136,-384 +920,192,-400 +920,240,-448 +920,216,-472 +920,192,-472 +904,136,-392 +904,128,-360 +920,136,-368 +952,136,-400 +944,168,-368 +952,152,-424 +944,152,-400 +968,160,-336 +928,184,-424 +928,192,-448 +928,224,-472 +904,208,-528 +920,128,-464 +920,112,-416 +920,152,-456 +936,184,-440 +944,232,-440 +936,248,-480 +944,200,-504 +952,168,-504 +936,168,-504 +912,216,-528 +904,240,-560 +912,256,-584 +928,192,-720 +944,96,-720 +944,40,-688 +2360,1864,4016 +2112,-1104,-4080 +1672,1232,-1280 +1408,264,-1576 +488,-1064,-1000 +104,-248,-1616 +1360,224,-408 +1416,840,-216 +864,648,-880 +688,88,-328 +840,-136,-328 +920,-104,-352 +904,-40,-224 +864,72,-248 +840,184,-288 +864,272,-272 +856,232,-328 +872,160,-440 +888,144,-504 +912,200,-544 +936,256,-632 +928,280,-512 +944,264,-576 +952,224,-520 +944,176,-464 +952,128,-440 +952,128,-464 +960,160,-400 +960,216,-384 +968,248,-376 +952,224,-368 +920,184,-416 +928,168,-400 +928,168,-472 +928,176,-400 +928,192,-448 +920,200,-440 +920,192,-424 +920,184,-392 +920,192,-400 +928,184,-424 +928,176,-464 +936,160,-424 +912,152,-416 +912,144,-392 +904,128,-432 +904,128,-472 +912,136,-440 +920,160,-424 +928,192,-432 +920,160,-360 +888,160,-400 +896,176,-416 +904,176,-424 +912,184,-440 +912,152,-448 +912,144,-392 +904,152,-432 +920,144,-440 +920,160,-416 +920,160,-448 +912,176,-432 +912,200,-456 +920,192,-464 +912,184,-464 +912,136,-416 +920,136,-424 +928,144,-464 +936,160,-432 +920,176,-448 +928,192,-448 +912,200,-440 +912,168,-504 +920,176,-448 +928,168,-456 +928,144,-448 +904,144,-440 +912,160,-408 +920,168,-424 +928,184,-432 +928,192,-416 +904,192,-384 +912,184,-448 +920,176,-456 +928,168,-464 +928,168,-440 +928,160,-408 +896,168,-416 +912,160,-408 +920,168,-424 +912,168,-432 +928,200,-416 +920,184,-400 +920,168,-440 +928,160,-448 +904,152,-424 +936,160,-400 +952,208,-416 +920,192,-472 +920,184,-424 +944,168,-448 +952,160,-368 +944,152,-448 +928,160,-448 +920,152,-448 +928,160,-448 +936,152,-440 +928,160,-376 +936,160,-408 +936,168,-456 +944,168,-416 +928,176,-456 +928,144,-440 +936,144,-448 +976,144,-424 +952,152,-424 +944,200,-416 +944,184,-392 +960,184,-440 +944,176,-472 +944,160,-424 +944,144,-440 +944,144,-408 +936,144,-400 +928,168,-408 +936,184,-456 +952,176,-408 +944,176,-448 +936,160,-432 +936,144,-424 +936,152,-432 +936,160,-384 +896,160,-424 +936,168,-440 +936,184,-456 +944,184,-432 +928,192,-440 +944,184,-416 +936,160,-448 +944,152,-432 +912,144,-440 +920,152,-400 +920,192,-400 +920,184,-400 +928,192,-432 +920,184,-448 +912,184,-360 +920,152,-488 +904,136,-464 +896,160,-408 +896,160,-400 +896,184,-408 +896,168,-416 +888,168,-440 +928,184,-360 +936,184,-448 +920,168,-480 +952,168,-464 +968,176,-448 +960,200,-472 +952,224,-496 +944,176,-544 +936,112,-576 +928,48,-568 +936,40,-576 +936,72,-552 +928,128,-552 +2504,-1328,1024 +-760,1376,-4080 +160,656,-224 +984,-344,-3128 +520,-1032,968 +808,-768,-1688 +1712,160,-232 +1376,832,-1176 +800,480,-704 +680,-24,-712 +888,-104,-648 +976,-56,-496 +952,-8,-336 +864,136,-296 +888,240,-304 +912,256,-336 +888,184,-360 +896,112,-400 +936,168,-400 +952,256,-368 +928,288,-360 +944,256,-392 +952,176,-424 +976,168,-376 +992,160,-344 +976,168,-360 +952,168,-368 +952,176,-336 +952,176,-312 +952,168,-352 +944,168,-312 +936,168,-368 +928,184,-384 +928,176,-400 +928,176,-400 +920,200,-424 +912,176,-336 +936,160,-352 +928,168,-360 +928,176,-392 +928,192,-344 +928,184,-416 +912,168,-448 +928,152,-448 +928,120,-424 +936,112,-400 +952,136,-424 +944,160,-392 +936,192,-456 +952,184,-432 +872,208,-520 +864,184,-488 +848,176,-504 +848,184,-544 +848,200,-552 +848,200,-568 +848,200,-568 +856,184,-568 +848,176,-552 +824,160,-528 +808,160,-512 +792,160,-528 +776,168,-536 +768,168,-544 +752,168,-520 +744,128,-528 +792,136,-600 +784,152,-576 +760,144,-560 +776,176,-552 +744,184,-552 +728,200,-552 +720,208,-560 +712,168,-592 +720,144,-616 +744,144,-584 +744,160,-576 +736,144,-592 +712,168,-576 +688,184,-584 +696,200,-616 +704,192,-696 +720,160,-688 +616,208,-632 +664,232,-616 +744,152,-616 +752,112,-672 +728,120,-680 +712,184,-632 +728,256,-616 +744,192,-640 +760,152,-656 +776,128,-672 +792,120,-680 +800,104,-656 +776,120,-688 +784,64,-688 +792,56,-712 +776,48,-696 +784,48,-696 +776,64,-704 +784,168,-696 +776,208,-648 +744,192,-696 +776,176,-656 +800,168,-616 +776,136,-664 +784,144,-672 +776,120,-688 +768,128,-720 +760,176,-648 +800,152,-696 +784,144,-664 +800,136,-672 +768,112,-672 +752,88,-712 +752,80,-688 +752,112,-672 +744,96,-720 +776,56,-616 +824,120,-616 +800,112,-608 +784,120,-648 +752,128,-680 +728,120,-688 +736,128,-688 +768,120,-672 +792,104,-648 +832,80,-608 +824,96,-640 +776,64,-664 +768,72,-648 +768,88,-680 +768,128,-680 +776,128,-728 +800,88,-680 +840,104,-648 +840,96,-656 +816,112,-656 +792,120,-632 +776,96,-664 +752,128,-648 +768,152,-640 +776,144,-640 +776,136,-624 +800,128,-680 +800,104,-664 +808,104,-688 +792,120,-648 +784,112,-624 +800,104,-632 +784,112,-664 +776,120,-704 +776,144,-616 +784,128,-656 +792,112,-648 +800,120,-672 +800,128,-632 +792,128,-624 +792,152,-656 +784,160,-648 +784,120,-648 +784,96,-672 +792,120,-640 +792,136,-616 +784,128,-640 +776,144,-632 +776,120,-624 +776,120,-648 +784,136,-656 +792,136,-664 +784,144,-640 +784,128,-624 +792,128,-616 diff --git a/server/exercise/src/test/scala/akka/stream/testkit/AkkaSpec.scala b/server/exercise/src/test/scala/akka/stream/testkit/AkkaSpec.scala new file mode 100644 index 00000000..3b502805 --- /dev/null +++ b/server/exercise/src/test/scala/akka/stream/testkit/AkkaSpec.scala @@ -0,0 +1,104 @@ +package akka.stream.testkit + +import akka.testkit.TestKit +import akka.event.LoggingAdapter +import scala.concurrent.duration.FiniteDuration +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import akka.dispatch.Dispatchers +import akka.actor.ActorSystem +import org.scalatest.Matchers +import com.typesafe.config.Config +import akka.event.Logging +import scala.concurrent.Future +import org.scalatest.WordSpecLike +import akka.testkit.TestEvent.Mute +import akka.testkit.DeadLettersFilter +import scala.concurrent.duration._ + +// TODO: until `akka-stream-testkit-experimental` package is released, we need a copy of this file for testing + +object AkkaSpec { // FIXME: remove once going back to project dependencies +val testConf: Config = ConfigFactory.parseString(""" + akka { + loggers = ["akka.testkit.TestEventListener"] + loglevel = "WARNING" + stdout-loglevel = "WARNING" + actor { + default-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 8 + parallelism-factor = 2.0 + parallelism-max = 8 + } + } + } + } + """) + + def mapToConfig(map: Map[String, Any]): Config = { + import scala.collection.JavaConverters._ + ConfigFactory.parseMap(map.asJava) + } + + def getCallerName(clazz: Class[_]): String = { + val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) + .dropWhile(_ matches "(java.lang.Thread|.*AkkaSpec.?$)") + val reduced = s.lastIndexWhere(_ == clazz.getName) match { + case -1 ⇒ s + case z ⇒ s drop (z + 1) + } + reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") + } + +} + +abstract class AkkaSpec(_system: ActorSystem) + extends TestKit(_system) with WordSpecLike with Matchers with BeforeAndAfterAll with WatchedByCoroner { + + def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), + ConfigFactory.load(config.withFallback(AkkaSpec.testConf)))) + + def this(s: String) = this(ConfigFactory.parseString(s)) + + def this(configMap: Map[String, _]) = this(AkkaSpec.mapToConfig(configMap)) + + def this() = this(ActorSystem(AkkaSpec.getCallerName(getClass), AkkaSpec.testConf)) + + val log: LoggingAdapter = Logging(system, this.getClass) + + override val invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected = true + + final override def beforeAll { + startCoroner + atStartup() + } + + final override def afterAll { + beforeTermination() + shutdown() + afterTermination() + stopCoroner() + } + + protected def atStartup() {} + + protected def beforeTermination() {} + + protected def afterTermination() {} + + def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit): Unit = + Future(body)(system.dispatchers.lookup(dispatcherId)) + + override def expectedTestDuration: FiniteDuration = 60.seconds + + def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit = + if (!sys.log.isDebugEnabled) { + def mute(clazz: Class[_]): Unit = + sys.eventStream.publish(Mute(DeadLettersFilter(clazz)(occurrences = Int.MaxValue))) + if (messageClasses.isEmpty) mute(classOf[AnyRef]) + else messageClasses foreach mute + } + +} diff --git a/server/exercise/src/test/scala/akka/stream/testkit/ChainSetup.scala b/server/exercise/src/test/scala/akka/stream/testkit/ChainSetup.scala new file mode 100644 index 00000000..6dc97ca6 --- /dev/null +++ b/server/exercise/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -0,0 +1,30 @@ +package akka.stream.testkit + +import akka.actor.{ ActorRefFactory, ActorSystem } +import akka.stream.MaterializerSettings +import akka.stream.scaladsl._ +import org.reactivestreams.Publisher +import akka.stream.FlowMaterializer + +// TODO: until `akka-stream-testkit-experimental` package is released, we need a copy of this file for testing + +class ChainSetup[In, Out]( + stream: Flow[In, In] ⇒ Flow[In, Out], + val settings: MaterializerSettings, + materializer: FlowMaterializer, + toPublisher: (Source[Out], FlowMaterializer) ⇒ Publisher[Out])(implicit val system: ActorSystem) { + + def this(stream: Flow[In, In] ⇒ Flow[In, Out], settings: MaterializerSettings, toPublisher: (Source[Out], FlowMaterializer) ⇒ Publisher[Out])(implicit system: ActorSystem) = + this(stream, settings, FlowMaterializer(settings)(system), toPublisher)(system) + + def this(stream: Flow[In, In] ⇒ Flow[In, Out], settings: MaterializerSettings, materializerCreator: (MaterializerSettings, ActorRefFactory) ⇒ FlowMaterializer, toPublisher: (Source[Out], FlowMaterializer) ⇒ Publisher[Out])(implicit system: ActorSystem) = + this(stream, settings, materializerCreator(settings, system), toPublisher)(system) + + val upstream = StreamTestKit.PublisherProbe[In]() + val downstream = StreamTestKit.SubscriberProbe[Out]() + private val s = Source(upstream).via(stream(Flow[In])) + val publisher = toPublisher(s, materializer) + val upstreamSubscription = upstream.expectSubscription() + publisher.subscribe(downstream) + val downstreamSubscription = downstream.expectSubscription() +} diff --git a/server/exercise/src/test/scala/akka/stream/testkit/Coroner.scala b/server/exercise/src/test/scala/akka/stream/testkit/Coroner.scala new file mode 100644 index 00000000..99448e28 --- /dev/null +++ b/server/exercise/src/test/scala/akka/stream/testkit/Coroner.scala @@ -0,0 +1,273 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.testkit + +import java.io.PrintStream +import java.lang.management.{ ManagementFactory, ThreadInfo } +import java.util.Date +import java.util.concurrent.{ TimeoutException, CountDownLatch } +import org.scalatest.{ BeforeAndAfterAll, Suite } +import scala.annotation.tailrec +import scala.concurrent.{ Promise, Awaitable, CanAwait, Await } +import scala.concurrent.duration._ +import scala.util.control.NonFatal +import akka.testkit.{ TestKit, TestDuration } + +// TODO: until `akka-stream-testkit-experimental` package is released, we need a copy of this file for testing + +/** + * The Coroner can be used to print a diagnostic report of the JVM state, + * including stack traces and deadlocks. A report can be printed directly, by + * calling `printReport`. Alternatively, the Coroner can be asked to `watch` + * the JVM and generate a report at a later time - unless the Coroner is cancelled + * by that time. + * + * The latter method is useful for printing diagnostics in the event that, for + * example, a unit test stalls and fails to cancel the Coroner in time. The + * Coroner will assume that the test has "died" and print a report to aid in + * debugging. + */ +object Coroner { // FIXME: remove once going back to project dependencies + + /** + * Used to cancel the Coroner after calling `watch`. + * The result of this Awaitable will be `true` if it has been cancelled. + */ + trait WatchHandle extends Awaitable[Boolean] { + /** + * Will try to ensure that the Coroner has finished reporting. + */ + def cancel(): Unit + } + + private class WatchHandleImpl(startAndStopDuration: FiniteDuration) + extends WatchHandle { + val cancelPromise = Promise[Boolean] + val startedLatch = new CountDownLatch(1) + val finishedLatch = new CountDownLatch(1) + + def waitForStart(): Unit = { + startedLatch.await(startAndStopDuration.length, startAndStopDuration.unit) + } + + def started(): Unit = startedLatch.countDown() + + def finished(): Unit = finishedLatch.countDown() + + def expired(): Unit = cancelPromise.trySuccess(false) + + override def cancel(): Unit = { + cancelPromise.trySuccess(true) + finishedLatch.await(startAndStopDuration.length, startAndStopDuration.unit) + } + + override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { + result(atMost) + this + } + + override def result(atMost: Duration)(implicit permit: CanAwait): Boolean = + try { Await.result(cancelPromise.future, atMost) } catch { case _: TimeoutException ⇒ false } + + } + + val defaultStartAndStopDuration = 1.second + + /** + * Ask the Coroner to print a report if it is not cancelled by the given deadline. + * The returned handle can be used to perform the cancellation. + * + * If displayThreadCounts is set to true, then the Coroner will print thread counts during start + * and stop. + */ + def watch(duration: FiniteDuration, reportTitle: String, out: PrintStream, + startAndStopDuration: FiniteDuration = defaultStartAndStopDuration, + displayThreadCounts: Boolean = false): WatchHandle = { + + val watchedHandle = new WatchHandleImpl(startAndStopDuration) + + def triggerReportIfOverdue(duration: Duration): Unit = { + val threadMx = ManagementFactory.getThreadMXBean() + val startThreads = threadMx.getThreadCount + if (displayThreadCounts) { + threadMx.resetPeakThreadCount() + out.println(s"Coroner Thread Count starts at $startThreads in $reportTitle") + } + watchedHandle.started() + try { + if (!Await.result(watchedHandle, duration)) { + watchedHandle.expired() + out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...") + try printReport(reportTitle, out) catch { + case NonFatal(ex) ⇒ { + out.println("Error displaying Coroner's Report") + ex.printStackTrace(out) + } + } + } + } finally { + if (displayThreadCounts) { + val endThreads = threadMx.getThreadCount + out.println(s"Coroner Thread Count started at $startThreads, ended at $endThreads, peaked at ${threadMx.getPeakThreadCount} in $reportTitle") + } + out.flush() + watchedHandle.finished() + } + } + new Thread(new Runnable { def run = triggerReportIfOverdue(duration) }, "Coroner").start() + watchedHandle.waitForStart() + watchedHandle + } + + /** + * Print a report containing diagnostic information. + */ + def printReport(reportTitle: String, out: PrintStream) { + import out.println + + val osMx = ManagementFactory.getOperatingSystemMXBean() + val rtMx = ManagementFactory.getRuntimeMXBean() + val memMx = ManagementFactory.getMemoryMXBean() + val threadMx = ManagementFactory.getThreadMXBean() + + println(s"""#Coroner's Report: $reportTitle + #OS Architecture: ${osMx.getArch()} + #Available processors: ${osMx.getAvailableProcessors()} + #System load (last minute): ${osMx.getSystemLoadAverage()} + #VM start time: ${new Date(rtMx.getStartTime())} + #VM uptime: ${rtMx.getUptime()}ms + #Heap usage: ${memMx.getHeapMemoryUsage()} + #Non-heap usage: ${memMx.getNonHeapMemoryUsage()}""".stripMargin('#')) + + def dumpAllThreads: Seq[ThreadInfo] = { + threadMx.dumpAllThreads( + threadMx.isObjectMonitorUsageSupported, + threadMx.isSynchronizerUsageSupported) + } + + def findDeadlockedThreads: (Seq[ThreadInfo], String) = { + val (ids, desc) = if (threadMx.isSynchronizerUsageSupported()) { + (threadMx.findDeadlockedThreads(), "monitors and ownable synchronizers") + } else { + (threadMx.findMonitorDeadlockedThreads(), "monitors, but NOT ownable synchronizers") + } + if (ids == null) { + (Seq.empty, desc) + } else { + val maxTraceDepth = 1000 // Seems deep enough + (threadMx.getThreadInfo(ids, maxTraceDepth), desc) + } + } + + def printThreadInfos(threadInfos: Seq[ThreadInfo]) = { + if (threadInfos.isEmpty) { + println("None") + } else { + for (ti ← threadInfos.sortBy(_.getThreadName)) { println(threadInfoToString(ti)) } + } + } + + def threadInfoToString(ti: ThreadInfo): String = { + val sb = new java.lang.StringBuilder + sb.append("\"") + sb.append(ti.getThreadName) + sb.append("\" Id=") + sb.append(ti.getThreadId) + sb.append(" ") + sb.append(ti.getThreadState) + + if (ti.getLockName != null) { + sb.append(" on " + ti.getLockName) + } + + if (ti.getLockOwnerName != null) { + sb.append(" owned by \"") + sb.append(ti.getLockOwnerName) + sb.append("\" Id=") + sb.append(ti.getLockOwnerId) + } + + if (ti.isSuspended) { + sb.append(" (suspended)") + } + + if (ti.isInNative) { + sb.append(" (in native)") + } + + sb.append('\n') + + def appendMsg(msg: String, o: Any) = { + sb.append(msg) + sb.append(o) + sb.append('\n') + } + + val stackTrace = ti.getStackTrace + for (i ← 0 until stackTrace.length) { + val ste = stackTrace(i) + appendMsg("\tat ", ste) + if (i == 0 && ti.getLockInfo != null) { + import java.lang.Thread.State._ + ti.getThreadState match { + case BLOCKED ⇒ appendMsg("\t- blocked on ", ti.getLockInfo) + case WAITING ⇒ appendMsg("\t- waiting on ", ti.getLockInfo) + case TIMED_WAITING ⇒ appendMsg("\t- waiting on ", ti.getLockInfo) + case _ ⇒ + } + } + + for (mi ← ti.getLockedMonitors if mi.getLockedStackDepth == i) + appendMsg("\t- locked ", mi) + } + + val locks = ti.getLockedSynchronizers + if (locks.length > 0) { + appendMsg("\n\tNumber of locked synchronizers = ", locks.length) + for (li ← locks) appendMsg("\t- ", li) + } + sb.append('\n') + return sb.toString + } + + println("All threads:") + printThreadInfos(dumpAllThreads) + + val (deadlockedThreads, deadlockDesc) = findDeadlockedThreads + println(s"Deadlocks found for $deadlockDesc:") + printThreadInfos(deadlockedThreads) + } + +} + +/** + * Mixin for tests that should be watched by the Coroner. The `startCoroner` + * and `stopCoroner` methods should be called before and after the test runs. + * The Coroner will display its report if the test takes longer than the + * (dilated) `expectedTestDuration` to run. + * + * If displayThreadCounts is set to true, then the Coroner will print thread + * counts during start and stop. + */ +trait WatchedByCoroner { + self: TestKit ⇒ + + @volatile private var coronerWatch: Coroner.WatchHandle = _ + + final def startCoroner() { + coronerWatch = Coroner.watch(expectedTestDuration.dilated, getClass.getName, System.err, + startAndStopDuration.dilated, displayThreadCounts) + } + + final def stopCoroner() { + coronerWatch.cancel() + coronerWatch = null + } + + def expectedTestDuration: FiniteDuration + + def displayThreadCounts: Boolean = true + + def startAndStopDuration: FiniteDuration = Coroner.defaultStartAndStopDuration +} diff --git a/server/exercise/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/server/exercise/src/test/scala/akka/stream/testkit/StreamTestKit.scala new file mode 100644 index 00000000..63d5dabc --- /dev/null +++ b/server/exercise/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -0,0 +1,180 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import akka.actor.ActorSystem +import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } +import akka.testkit.TestProbe +import org.reactivestreams.{ Publisher, Subscriber, Subscription } + +import scala.concurrent.duration.FiniteDuration + +// TODO: until `akka-stream-testkit-experimental` package is released, we need a copy of this file for testing + +object StreamTestKit { + + /** + * Subscribes the subscriber and completes after the first request. + */ + def lazyEmptyPublisher[T]: Publisher[T] = new Publisher[T] { + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = + subscriber.onSubscribe(CompletedSubscription(subscriber)) + } + + /** + * Signals error to subscribers immediately, before handing out subscription. + */ + def errorPublisher[T](cause: Throwable): Publisher[T] = ErrorPublisher(cause, "error").asInstanceOf[Publisher[T]] + + def emptyPublisher[T](): Publisher[T] = EmptyPublisher[T] + + /** + * Subscribes the subscriber and signals error after the first request. + */ + def lazyErrorPublisher[T](cause: Throwable): Publisher[T] = new Publisher[T] { + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = + subscriber.onSubscribe(FailedSubscription(subscriber, cause)) + } + + private case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription { + override def request(elements: Long): Unit = subscriber.onError(cause) + override def cancel(): Unit = () + } + + private case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription { + override def request(elements: Long): Unit = subscriber.onComplete() + override def cancel(): Unit = () + } + + class AutoPublisher[T](probe: PublisherProbe[T], initialPendingRequests: Long = 0) { + val subscription = probe.expectSubscription() + var pendingRequests = initialPendingRequests + + def sendNext(elem: T): Unit = { + if (pendingRequests == 0) pendingRequests = subscription.expectRequest() + pendingRequests -= 1 + subscription.sendNext(elem) + } + + def sendComplete(): Unit = subscription.sendComplete() + + def sendError(cause: Exception): Unit = subscription.sendError(cause) + } + + sealed trait SubscriberEvent + case class OnSubscribe(subscription: Subscription) extends SubscriberEvent + case class OnNext[I](element: I) extends SubscriberEvent + case object OnComplete extends SubscriberEvent + case class OnError(cause: Throwable) extends SubscriberEvent + + sealed trait PublisherEvent + case class Subscribe(subscription: Subscription) extends PublisherEvent + case class CancelSubscription(subscription: Subscription) extends PublisherEvent + case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent + + case class PublisherProbeSubscription[I](subscriber: Subscriber[_ >: I], publisherProbe: TestProbe) extends Subscription { + def request(elements: Long): Unit = publisherProbe.ref ! RequestMore(this, elements) + def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this) + + def expectRequest(n: Long): Unit = publisherProbe.expectMsg(RequestMore(this, n)) + def expectRequest(): Long = publisherProbe.expectMsgPF() { + case RequestMore(sub, n) if sub eq this ⇒ n + } + + def expectCancellation(): Unit = publisherProbe.fishForMessage() { + case CancelSubscription(sub) if sub eq this ⇒ true + case RequestMore(sub, _) if sub eq this ⇒ false + } + + def sendNext(element: I): Unit = subscriber.onNext(element) + def sendComplete(): Unit = subscriber.onComplete() + def sendError(cause: Exception): Unit = subscriber.onError(cause) + } + + object SubscriberProbe { + def apply[I]()(implicit system: ActorSystem): SubscriberProbe[I] = new SubscriberProbe[I]() + } + + class SubscriberProbe[I]()(implicit system: ActorSystem) extends Subscriber[I] { + val probe = TestProbe() + + def expectSubscription(): Subscription = probe.expectMsgType[OnSubscribe].subscription + def expectEvent(event: SubscriberEvent): Unit = probe.expectMsg(event) + def expectNext(element: I): Unit = probe.expectMsg(OnNext(element)) + def expectNext(e1: I, e2: I, es: I*): Unit = { + val all = e1 +: e2 +: es + all.foreach(e ⇒ probe.expectMsg(OnNext(e))) + } + + def expectNext(): I = probe.expectMsgType[OnNext[I]].element + def expectComplete(): Unit = probe.expectMsg(OnComplete) + + def expectError(cause: Throwable): Unit = probe.expectMsg(OnError(cause)) + def expectError(): Throwable = probe.expectMsgType[OnError].cause + + def expectNextOrError(element: I, cause: Throwable): Either[Throwable, I] = { + probe.fishForMessage(hint = s"OnNext($element) or ${cause.getClass.getName}") { + case OnNext(n) ⇒ true + case OnError(`cause`) ⇒ true + } match { + case OnNext(n: I) ⇒ Right(n) + case OnError(err) ⇒ Left(err) + } + } + + def expectErrorOrSubscriptionFollowedByError(cause: Throwable): Unit = { + val t = expectErrorOrSubscriptionFollowedByError() + assert(t == cause, s"expected $cause, found $cause") + } + + def expectErrorOrSubscriptionFollowedByError(): Throwable = + probe.expectMsgPF() { + case s: OnSubscribe ⇒ + s.subscription.request(1) + expectError() + case OnError(cause) ⇒ cause + } + + def expectCompletedOrSubscriptionFollowedByComplete(): Unit = { + probe.expectMsgPF() { + case s: OnSubscribe ⇒ + s.subscription.request(1) + expectComplete() + case OnComplete ⇒ + } + } + + def expectNoMsg(): Unit = probe.expectNoMsg() + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + + def onSubscribe(subscription: Subscription): Unit = probe.ref ! OnSubscribe(subscription) + def onNext(element: I): Unit = probe.ref ! OnNext(element) + def onComplete(): Unit = probe.ref ! OnComplete + def onError(cause: Throwable): Unit = probe.ref ! OnError(cause) + } + + object PublisherProbe { + def apply[I]()(implicit system: ActorSystem): PublisherProbe[I] = new PublisherProbe[I]() + } + + class PublisherProbe[I]()(implicit system: ActorSystem) extends Publisher[I] { + val probe: TestProbe = TestProbe() + + def subscribe(subscriber: Subscriber[_ >: I]): Unit = { + val subscription: PublisherProbeSubscription[I] = new PublisherProbeSubscription[I](subscriber, probe) + probe.ref ! Subscribe(subscription) + subscriber.onSubscribe(subscription) + } + + def expectSubscription(): PublisherProbeSubscription[I] = + probe.expectMsgType[Subscribe].subscription.asInstanceOf[PublisherProbeSubscription[I]] + + def expectRequest(subscription: Subscription, n: Int): Unit = probe.expectMsg(RequestMore(subscription, n)) + + def expectNoMsg(): Unit = probe.expectNoMsg() + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + + def getPublisher: Publisher[I] = this + } +} diff --git a/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/GestureWorkflowTest.scala b/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/GestureWorkflowTest.scala new file mode 100644 index 00000000..67e303ad --- /dev/null +++ b/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/GestureWorkflowTest.scala @@ -0,0 +1,501 @@ +package com.eigengo.lift.exercise.classifiers.workflows + +import akka.stream.{FlowMaterializer, MaterializerSettings} +import akka.stream.scaladsl._ +import akka.stream.testkit.{StreamTestKit, AkkaSpec} +import com.eigengo.lift.exercise.AccelerometerValue +import com.typesafe.config.ConfigFactory +import scala.io.{Source => IOSource} + +class GestureWorkflowTest extends AkkaSpec(ConfigFactory.load("classification.conf")) with GestureWorkflows { + + import ClassificationAssertions._ + import StreamTestKit._ + + val name = "tap" + val config = system.settings.config + + val settings = MaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1024) + + implicit val materializer = FlowMaterializer(settings) + + val accelerometerData = Option(getClass.getResource("/samples/tap.csv")).map { dataFile => + IOSource.fromURL(dataFile, "UTF-8").getLines().map(line => { val List(x, y, z) = line.split(",").toList.map(_.toInt); AccelerometerValue(x, y, z) }) + }.get.toList + val noTapEvents = accelerometerData.slice(600, accelerometerData.length) + val tapEvents = accelerometerData.slice(0, 600) + + "IdentifyGestureEvents" must { + + def component(in: PublisherProbe[AccelerometerValue], out: SubscriberProbe[AccelerometerValue], tap: SubscriberProbe[Fact]) = FlowGraph { implicit builder => + val identify = IdentifyGestureEvents() + + builder.importPartialFlowGraph(identify.graph) + + builder.attachSource(identify.in, Source(in)) + builder.attachSink(identify.out, Sink(out)) + builder.attachSink(identify.tap, Sink(tap)) + } + + "in messages should pass through unaltered and tap's are not detected [no tap request]" in { + val msgs = noTapEvents + val inProbe = PublisherProbe[AccelerometerValue]() + val outProbe = SubscriberProbe[AccelerometerValue]() + val tapProbe = SubscriberProbe[Fact]() + + component(inProbe, outProbe, tapProbe).run() + val inPub = inProbe.expectSubscription() + val outSub = outProbe.expectSubscription() + val tapSub = tapProbe.expectSubscription() + + outSub.request(msgs.length) + tapSub.request(msgs.length) + for (msg <- msgs) { + inPub.sendNext(msg) + } + inPub.sendComplete() + + outProbe.expectNext(msgs(0), msgs(1), msgs.drop(2): _*) + for ((msg, index) <- msgs.zipWithIndex) { + tapProbe.expectNext(Unknown) + } + } + + "in messages should pass through unaltered and tap is detected [tap request]" in { + val msgs = tapEvents + val gestureWindow = List(256 until 290, 341 until 344, 379 until 408, 546 until 576).flatten.toList + val inProbe = PublisherProbe[AccelerometerValue]() + val outProbe = SubscriberProbe[AccelerometerValue]() + val tapProbe = SubscriberProbe[Fact]() + + component(inProbe, outProbe, tapProbe).run() + val inPub = inProbe.expectSubscription() + val outSub = outProbe.expectSubscription() + val tapSub = tapProbe.expectSubscription() + + outSub.request(msgs.length) + tapSub.request(msgs.length) + for (msg <- msgs) { + inPub.sendNext(msg) + } + inPub.sendComplete() + + outProbe.expectNext(msgs(0), msgs(1), msgs.drop(2): _*) + for ((msg, index) <- msgs.zipWithIndex) { + val event = tapProbe.expectNext() + if (gestureWindow.contains(index)) { + event shouldBe a[Gesture] + event.asInstanceOf[Gesture].name should be("tap") + event.asInstanceOf[Gesture].matchProbability should be > 0.75 + } else { + event should be(Unknown) + } + } + } + + } + + "MergeSignals" must { + + val merge = MergeSignals[Fact, Fact](3) { (obs: Set[Fact]) => + require(obs.nonEmpty) + + if (obs.filter(_.isInstanceOf[Gesture]).asInstanceOf[Set[Gesture]].filter(_.name == name).nonEmpty) { + obs.filter(_.isInstanceOf[Gesture]).asInstanceOf[Set[Gesture]].filter(_.name == name).maxBy(_.matchProbability) + } else { + obs.head + } + } + + def component(inProbe: List[PublisherProbe[Fact]], out: SubscriberProbe[Fact]) = FlowGraph { implicit builder => + builder.importPartialFlowGraph(merge.graph) + + for (n <- 0 until 3) { + builder.attachSource(merge.in(n), Source(inProbe(n))) + } + builder.attachSink(merge.out, Sink(out)) + } + + "correctly merge tagged accelerometer data values by selecting gesture with largest matching probability" in { + val msgs: List[Fact] = List(Gesture(s"$name-other", 0.90), Unknown, Gesture(name, 0.80)) + val inProbe = (0 until 3).map(_ => PublisherProbe[Fact]()).toList + val out = SubscriberProbe[Fact]() + + component(inProbe, out).run() + val inPub = inProbe.map(_.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNext(msgs(2)) + } + + "correctly merge tagged accelerometer data values when no signals are a gesture match" in { + val msgs: List[Fact] = List(Unknown, Gesture(s"$name-other", 0.70), Unknown) + val inProbe = (0 until 3).map(_ => PublisherProbe[Fact]()).toList + val out = SubscriberProbe[Fact]() + + component(inProbe, out).run() + val inPub = inProbe.map(_.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNext(msgs(0)) + } + + } + + "ModulateSensorNet" must { + + val modulate = ModulateSensorNet[String, Fact, Int]((0 until 3).toSet) + + def component(in: Map[Int, PublisherProbe[String]], transform: PublisherProbe[Fact], out: Map[Int, SubscriberProbe[Bind[String]]]) = FlowGraph { implicit builder => + builder.importPartialFlowGraph(modulate.graph) + + for (loc <- 0 until 3) { + builder.attachSource(modulate.in(loc), Source(in(loc))) + } + builder.attachSource(modulate.transform, Source(transform)) + for (loc <- 0 until 3) { + builder.attachSink(modulate.out(loc), Sink(out(loc))) + } + } + + "request for output on at least one wire (input values present) should be correctly transformed" in { + val msgs = List("one", "two", "three") + val inProbe = (0 until 3).map(n => (n, PublisherProbe[String]())).toMap + val outProbe = (0 until 3).map(n => (n, SubscriberProbe[Bind[String]]())).toMap + val transformProbe = PublisherProbe[Fact]() + + component(inProbe, transformProbe, outProbe).run() + val inPub = inProbe.map { case (n, pub) => (n, pub.expectSubscription()) }.toMap + val outSub = outProbe.map { case (n, sub) => (n, sub.expectSubscription()) }.toMap + val transformPub = transformProbe.expectSubscription() + + outSub(1).request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + transformPub.sendNext(Unknown) + + outProbe(1).expectNext(Bind(Predicate(Unknown), "two")) + } + + "request for outputs on all 3 wires (input values present) should be correctly transformed [no gesture]" in { + val msgs = List("one", "two", "three") + val inProbe = (0 until 3).map(n => (n, PublisherProbe[String]())).toMap + val outProbe = (0 until 3).map(n => (n, SubscriberProbe[Bind[String]]())).toMap + val transformProbe = PublisherProbe[Fact]() + + component(inProbe, transformProbe, outProbe).run() + val inPub = inProbe.map { case (n, pub) => (n, pub.expectSubscription()) }.toMap + val outSub = outProbe.map { case (n, sub) => (n, sub.expectSubscription()) }.toMap + val transformPub = transformProbe.expectSubscription() + + for (n <- 0 until 3) { + outSub(n).request(1) + } + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + transformPub.sendNext(Unknown) + + for (n <- 0 until msgs.length) { + outProbe(n % 3).expectNext(Bind(Predicate(Unknown), msgs(n))) + } + } + + "request for outputs on all 3 wires (input values present) should be correctly transformed [gesture present]" in { + val msgs = List("one", "two", "three") + val inProbe = (0 until 3).map(n => (n, PublisherProbe[String]())).toMap + val outProbe = (0 until 3).map(n => (n, SubscriberProbe[Bind[String]]())).toMap + val transformProbe = PublisherProbe[Fact]() + + component(inProbe, transformProbe, outProbe).run() + val inPub = inProbe.map { case (n, pub) => (n, pub.expectSubscription()) }.toMap + val outSub = outProbe.map { case (n, sub) => (n, sub.expectSubscription()) }.toMap + val transformPub = transformProbe.expectSubscription() + + for (n <- 0 until 3) { + outSub(n).request(1) + } + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + transformPub.sendNext(Gesture("transform", 0.42)) + + for (n <- 0 until msgs.length) { + outProbe(n % 3).expectNext(Bind(Predicate(Gesture("transform", 0.42)), msgs(n))) + } + } + + "multiple requests for output on all 3 wires (input values present) should be correctly transformed [no gesture]" in { + val msgs = List("one", "two", "three", "four", "five", "six") + val inProbe = (0 until 3).map(n => (n, PublisherProbe[String]())).toMap + val outProbe = (0 until 3).map(n => (n, SubscriberProbe[Bind[String]]())).toMap + val transformProbe = PublisherProbe[Fact]() + + component(inProbe, transformProbe, outProbe).run() + val inPub = inProbe.map { case (n, pub) => (n, pub.expectSubscription()) }.toMap + val outSub = outProbe.map { case (n, sub) => (n, sub.expectSubscription()) }.toMap + val transformPub = transformProbe.expectSubscription() + + for (n <- 0 until 3) { + outSub(n).request(2) + } + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + transformPub.sendNext(Unknown) + transformPub.sendNext(Unknown) + + for (n <- 0 until msgs.length) { + outProbe(n % 3).expectNext(Bind(Predicate(Unknown), msgs(n))) + } + } + + "multiple requests for output on all 3 wires (input values present) should be correctly transformed [gesture present]" in { + val msgs = List("one", "two", "three", "four", "five", "six") + val inProbe = (0 until 3).map(n => (n, PublisherProbe[String]())).toMap + val outProbe = (0 until 3).map(n => (n, SubscriberProbe[Bind[String]]())).toMap + val transformProbe = PublisherProbe[Fact]() + + component(inProbe, transformProbe, outProbe).run() + val inPub = inProbe.map { case (n, pub) => (n, pub.expectSubscription()) }.toMap + val outSub = outProbe.map { case (n, sub) => (n, sub.expectSubscription()) }.toMap + val transformPub = transformProbe.expectSubscription() + + for (n <- 0 until 3) { + outSub(n).request(2) + } + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + transformPub.sendNext(Gesture("transform", 0.42)) + transformPub.sendNext(Unknown) + + for (n <- 0 until msgs.length) { + if (n / 3 == 0) { + outProbe(n % 3).expectNext(Bind(Predicate(Gesture("transform", 0.42)), msgs(n))) + } else { + outProbe(n % 3).expectNext(Bind(Predicate(Unknown), msgs(n))) + } + } + } + + } + + "GestureClassification" must { + + def component(inClassify: List[PublisherProbe[AccelerometerValue]], outClassify: List[SubscriberProbe[AccelerometerValue]], inModulate: List[PublisherProbe[AccelerometerValue]], outModulate: List[SubscriberProbe[Bind[AccelerometerValue]]]) = FlowGraph { implicit builder => + require(inClassify.size == outClassify.size) + require(inModulate.size == outModulate.size) + + val workflow = GestureClassification[Int]((0 until inClassify.size).toSet, (0 until inModulate.size).toSet) + + builder.importPartialFlowGraph(workflow.graph) + + // Wire up classification sensors + for ((probe, index) <- inClassify.zipWithIndex) { + builder.attachSource(workflow.inputTap(index), Source(probe)) + } + for ((probe, index) <- outClassify.zipWithIndex) { + builder.attachSink(workflow.outputTap(index), Sink(probe)) + } + // Wire up modulation sensors + for ((probe, index) <- inModulate.zipWithIndex) { + builder.attachSource(workflow.inputModulate(index), Source(probe)) + } + for ((probe, index) <- outModulate.zipWithIndex) { + builder.attachSink(workflow.outputModulate(index), Sink(probe)) + } + } + + "leave classify sensor data and modulation sensor data unaltered when no taps are detected [1 sensor]" in { + val size = 1 + val classifyEvents = noTapEvents + val modulateEvents = (0 until classifyEvents.length).map(n => AccelerometerValue(n, n, n)) + val inClassify = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outClassify = (0 until size).map(_ => SubscriberProbe[AccelerometerValue]()).toList + val inModulate = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outModulate = (0 until size).map(_ => SubscriberProbe[Bind[AccelerometerValue]]()).toList + + component(inClassify, outClassify, inModulate, outModulate).run() + val inPubClassify = inClassify.map(_.expectSubscription()).toList + val outSubClassify = outClassify.map(_.expectSubscription()).toList + val inPubModulate = inModulate.map(_.expectSubscription()).toList + val outSubModulate = outModulate.map(_.expectSubscription()).toList + for (probe <- outSubClassify) { + probe.request((classifyEvents.length / size) + 1) + } + for (probe <- outSubModulate) { + probe.request((modulateEvents.length / size) + 1) + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + inPubClassify(n % size).sendNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + inPubModulate(n % size).sendNext(msg) + } + // Ensures all remaining data is flushed and facts allow merge to complete latching modulation data through! + for (n <- 0 until size) { + inPubClassify(n).sendComplete() + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + outClassify(n % size).expectNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + outModulate(n % size).expectNext(Bind(Predicate(Unknown), msg)) + } + } + + "leave classify sensor data and modulation sensor data unaltered when no taps are detected [2 sensors]" in { + val size = 2 + val classifyEvents = noTapEvents + val modulateEvents = (0 until classifyEvents.length).map(n => AccelerometerValue(n, n, n)) + val inClassify = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outClassify = (0 until size).map(_ => SubscriberProbe[AccelerometerValue]()).toList + val inModulate = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outModulate = (0 until size).map(_ => SubscriberProbe[Bind[AccelerometerValue]]()).toList + + component(inClassify, outClassify, inModulate, outModulate).run() + val inPubClassify = inClassify.map(_.expectSubscription()).toList + val outSubClassify = outClassify.map(_.expectSubscription()).toList + val inPubModulate = inModulate.map(_.expectSubscription()).toList + val outSubModulate = outModulate.map(_.expectSubscription()).toList + for (probe <- outSubClassify) { + probe.request((classifyEvents.length / size) + 1) + } + for (probe <- outSubModulate) { + probe.request((modulateEvents.length / size) + 1) + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + inPubClassify(n % size).sendNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + inPubModulate(n % size).sendNext(msg) + } + // Ensures all remaining data is flushed and facts allow merge to complete latching modulation data through! + for (n <- 0 until size) { + inPubClassify(n).sendComplete() + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + outClassify(n % size).expectNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + outModulate(n % size).expectNext(Bind(Predicate(Unknown), msg)) + } + } + + "leave classify sensor data unaltered and modulation sensor data tagged when taps are detected [1 sensor]" in { + val size = 1 + val classifyEvents = tapEvents + val gestureWindow = List(256 until 290, 341 until 344, 379 until 408, 546 until 576).flatten.toList + val modulateEvents = (0 until classifyEvents.length).map(n => AccelerometerValue(n, n, n)) + val inClassify = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outClassify = (0 until size).map(_ => SubscriberProbe[AccelerometerValue]()).toList + val inModulate = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outModulate = (0 until size).map(_ => SubscriberProbe[Bind[AccelerometerValue]]()).toList + + component(inClassify, outClassify, inModulate, outModulate).run() + val inPubClassify = inClassify.map(_.expectSubscription()).toList + val outSubClassify = outClassify.map(_.expectSubscription()).toList + val inPubModulate = inModulate.map(_.expectSubscription()).toList + val outSubModulate = outModulate.map(_.expectSubscription()).toList + for (probe <- outSubClassify) { + probe.request((classifyEvents.length / size) + 1) + } + for (probe <- outSubModulate) { + probe.request((modulateEvents.length / size) + 1) + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + inPubClassify(n % size).sendNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + inPubModulate(n % size).sendNext(msg) + } + // Ensures all remaining data is flushed and facts allow merge to complete latching modulation data through! + for (n <- 0 until size) { + inPubClassify(n).sendComplete() + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + outClassify(n % size).expectNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + if (gestureWindow.contains(n)) { + val event = outModulate(n % size).expectNext() + event.value should be(msg) + event.assertion shouldBe a[Predicate] + event.assertion.asInstanceOf[Predicate].fact shouldBe a[Gesture] + event.assertion.asInstanceOf[Predicate].fact.asInstanceOf[Gesture].name should be(name) + } else { + outModulate(n % size).expectNext(Bind(Predicate(Unknown), msg)) + } + } + } + + "leave classify sensor data unaltered and modulation sensor data tagged when taps are detected [2 sensors]" in { + val size = 2 + val classifyEvents = tapEvents + val gestureWindow = List(246 until 280, 354 until 408, 522 until 552).flatten.toList + val modulateEvents = (0 until classifyEvents.length).map(n => AccelerometerValue(n, n, n)) + val inClassify = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outClassify = (0 until size).map(_ => SubscriberProbe[AccelerometerValue]()).toList + val inModulate = (0 until size).map(_ => PublisherProbe[AccelerometerValue]()).toList + val outModulate = (0 until size).map(_ => SubscriberProbe[Bind[AccelerometerValue]]()).toList + + component(inClassify, outClassify, inModulate, outModulate).run() + val inPubClassify = inClassify.map(_.expectSubscription()).toList + val outSubClassify = outClassify.map(_.expectSubscription()).toList + val inPubModulate = inModulate.map(_.expectSubscription()).toList + val outSubModulate = outModulate.map(_.expectSubscription()).toList + for (probe <- outSubClassify) { + probe.request((classifyEvents.length / size) + 1) + } + for (probe <- outSubModulate) { + probe.request((modulateEvents.length / size) + 1) + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + inPubClassify(n % size).sendNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + inPubModulate(n % size).sendNext(msg) + } + // Ensures all remaining data is flushed and facts allow merge to complete latching modulation data through! + for (n <- 0 until size) { + inPubClassify(n).sendComplete() + } + + for ((msg, n) <- classifyEvents.zipWithIndex) { + outClassify(n % size).expectNext(msg) + } + for ((msg, n) <- modulateEvents.zipWithIndex) { + if (gestureWindow.contains(n)) { + val event = outModulate(n % size).expectNext() + event.value should be(msg) + event.assertion shouldBe a[Predicate] + event.assertion.asInstanceOf[Predicate].fact shouldBe a[Gesture] + event.assertion.asInstanceOf[Predicate].fact.asInstanceOf[Gesture].name should be(name) + } else { + outModulate(n % size).expectNext(Bind(Predicate(Unknown), msg)) + } + } + } + + } + +} diff --git a/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindowTest.scala b/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindowTest.scala new file mode 100644 index 00000000..7d9abc47 --- /dev/null +++ b/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindowTest.scala @@ -0,0 +1,140 @@ +package com.eigengo.lift.exercise.classifiers.workflows + +import akka.stream.{FlowMaterializer, MaterializerSettings} +import akka.stream.scaladsl._ +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } + +class SlidingWindowTest extends AkkaSpec { + + import FlowGraphImplicits._ + import StreamTestKit._ + + val settings = MaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1) + + implicit val materializer = FlowMaterializer(settings) + + def sample(in: Source[String], out: Sink[List[String]]) = FlowGraph { implicit builder => + in ~> Flow[String].transform(() => SlidingWindow[String](5)) ~> out + } + + "SlidingWindow" must { + "SlidingWindow should receive elements, but not emit them whilst its internal buffer is not full" in { + val msgs = List("one", "two", "three") + // Simulate source that outputs messages and then blocks + val in = PublisherProbe[String]() + val out = SubscriberProbe[List[String]]() + + val workflow = sample(Source(in), Sink(out)) + workflow.run() + val pub = in.expectSubscription() + val sub = out.expectSubscription() + sub.request(msgs.length) + for (msg <- msgs) { + pub.sendNext(msg) + } + + out.expectNoMsg() + } + + "a saturated SlidingWindow should emit elements in the order they are received" in { + val msgs = List("one", "two", "three", "four", "five", "six", "seven") + // Simulate source that outputs messages and then blocks + val in = PublisherProbe[String]() + val out = SubscriberProbe[List[String]]() + + val workflow = sample(Source(in), Sink(out)) + workflow.run() + val pub = in.expectSubscription() + val sub = out.expectSubscription() + sub.request(msgs.length) + for (msg <- msgs) { + pub.sendNext(msg) + } + + out.expectNext(msgs.slice(0, 5), msgs.slice(1, 6), msgs.slice(2, 7)) + out.expectNoMsg() // since buffer is saturated and no more messages are arriving + } + + "closing a partially full SlidingWindow should flush buffered elements" in { + val msgs = List("one", "two", "three") + // Simulate source that outputs messages and then completes + val in = PublisherProbe[String]() + val out = SubscriberProbe[List[String]]() + + val workflow = sample(Source(in), Sink(out)) + workflow.run() + val pub = in.expectSubscription() + val sub = out.expectSubscription() + sub.request(msgs.length + 1) // + OnComplete + for (msg <- msgs) { + pub.sendNext(msg) + } + pub.sendComplete() + + out.expectNext(msgs.slice(0, 5), msgs.slice(1, 6), msgs.slice(2, 7)) + out.expectComplete() + } + + "closing a saturated SlidingWindow should flush buffered elements" in { + val msgs = List("one", "two", "three", "four", "five", "six", "seven") + // Simulate source that outputs messages and then completes + val in = PublisherProbe[String]() + val out = SubscriberProbe[List[String]]() + + val workflow = sample(Source(in), Sink(out)) + workflow.run() + val pub = in.expectSubscription() + val sub = out.expectSubscription() + sub.request(msgs.length + 1) // + OnComplete + for (msg <- msgs) { + pub.sendNext(msg) + } + pub.sendComplete() + + out.expectNext(msgs.slice(0, 5), msgs.slice(1, 6), msgs.slice(2, 7), msgs.slice(3, 8), msgs.slice(4, 9), msgs.slice(5, 10), msgs.slice(6, 11)) + out.expectComplete() + } + + "exceptions (i.e. catastrophic stream errors) on a partially full SlidingWindow materialise 'immediately'" in { + val exn = new RuntimeException("fake error") + val msgs = List("one", "two", "three") + // Simulate source that outputs messages and then errors + val in = PublisherProbe[String]() + val out = SubscriberProbe[List[String]]() + + val workflow = sample(Source(in), Sink(out)) + workflow.run() + val pub = in.expectSubscription() + val sub = out.expectSubscription() + sub.request(msgs.length) + for (msg <- msgs) { + pub.sendNext(msg) + } + pub.sendError(exn) + + out.expectError(exn) + } + + "exceptions (i.e. catastrophic stream errors) on a saturated SlidingWindow materialise 'immediately'" in { + val exn = new RuntimeException("fake error") + val msgs = List("one", "two", "three", "four", "five", "six") + // Simulate source that outputs messages and then errors + val in = PublisherProbe[String]() + val out = SubscriberProbe[List[String]]() + + val workflow = sample(Source(in), Sink(out)) + workflow.run() + val pub = in.expectSubscription() + val sub = out.expectSubscription() + sub.request(msgs.length) + for (msg <- msgs) { + pub.sendNext(msg) + } + pub.sendError(exn) + + out.expectNext(msgs.slice(0, 5), msgs.slice(1, 6)) + out.expectError(exn) + } + } + +} diff --git a/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/ZipNodesTest.scala b/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/ZipNodesTest.scala new file mode 100644 index 00000000..55f502cb --- /dev/null +++ b/server/exercise/src/test/scala/com/eigengo/lift/exercise/classifiers/workflows/ZipNodesTest.scala @@ -0,0 +1,294 @@ +package com.eigengo.lift.exercise.classifiers.workflows + +import akka.stream.{FlowMaterializer, MaterializerSettings} +import akka.stream.scaladsl._ +import akka.stream.testkit.{StreamTestKit, AkkaSpec} + +class ZipNodesTest extends AkkaSpec { + + import FlowGraphImplicits._ + import StreamTestKit._ + + val settings = MaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 4) + + implicit val materializer = FlowMaterializer(settings) + + "ZipN(3)" must { + + def merge(in: Set[Source[String]], out: Sink[Set[String]]) = FlowGraph { implicit builder => + val zip = ZipN[String](in.size) + + for ((probe, index) <- in.zipWithIndex) { + probe ~> zip.in(index) + } + zip.out ~> out + } + + "not output anything if it receives nothing" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + + out.expectNoMsg() + } + + "not output anything if only one of its inputs receives a message" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNoMsg() + } + + "not output anything if only two of its inputs receive messages" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNoMsg() + } + + "output if all three inputs receive messages" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two", "three") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNext(Set("one", "two", "three")) + } + + "output multiple messages in the order they were received" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two", "three", "four", "five", "six", "seven") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(3) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNext(Set("one", "two", "three"), Set("four", "five", "six")) + } + + "closing an upstream should close the entire flow" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two", "three", "four", "five") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(2) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + inPub(2).sendComplete() + + out.expectNext(Set("one", "two", "three")) + out.expectComplete() + } + + "an upstream error should error the entire flow" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val exn = new RuntimeException("fake error") + val msgs = List("one", "two", "three", "four", "five") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(2) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + inPub(0).sendError(exn) + + out.expectNext(Set("one", "two", "three")) + out.expectError(exn) + } + } + + "ZipSet(Set(1, 2, 3))" must { + + def merge(in: Set[Source[String]], out: Sink[Set[String]]) = FlowGraph { implicit builder => + val zip = ZipSet[String, Int]((0 until in.size).toSet) + + for ((probe, index) <- in.zipWithIndex) { + probe ~> zip.in(index) + } + zip.out ~> out + } + + "not output anything if it receives nothing" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + + out.expectNoMsg() + } + + "not output anything if only one of its inputs receives a message" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNoMsg() + } + + "not output anything if only two of its inputs receive messages" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNoMsg() + } + + "output if all three inputs receive messages" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two", "three") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(1) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNext(Set("one", "two", "three")) + } + + "output multiple messages in the order they were received" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two", "three", "four", "five", "six", "seven") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(3) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + + out.expectNext(Set("one", "two", "three"), Set("four", "five", "six")) + } + + "closing an upstream should close the entire flow" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val msgs = List("one", "two", "three", "four", "five") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(2) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + inPub(2).sendComplete() + + out.expectNext(Set("one", "two", "three")) + out.expectComplete() + } + + "an upstream error should error the entire flow" in { + val inProbe = (0 until 3).map(_ => PublisherProbe[String]()) + val out = SubscriberProbe[Set[String]]() + val exn = new RuntimeException("fake error") + val msgs = List("one", "two", "three", "four", "five") + + val workflow = merge(inProbe.map(Source.apply[String]).toSet, Sink(out)) + workflow.run() + val inPub = inProbe.map(in => in.expectSubscription()) + val sub = out.expectSubscription() + + sub.request(2) + for ((msg, n) <- msgs.zipWithIndex) { + inPub(n % 3).sendNext(msg) + } + inPub(0).sendError(exn) + + out.expectNext(Set("one", "two", "three")) + out.expectError(exn) + } + } + +} diff --git a/server/project/Dependencies.scala b/server/project/Dependencies.scala index 6161c779..e32e00d6 100644 --- a/server/project/Dependencies.scala +++ b/server/project/Dependencies.scala @@ -15,9 +15,12 @@ object Dependencies { object streams { val version = "1.0-M2" - val core = "com.typesafe.akka" %% "akka-stream-experimental" % version - val http = "com.typesafe.akka" %% "akka-http-experimental" % version - val http_core = "com.typesafe.akka" %% "akka-http-core-experimental" % version + val core = "com.typesafe.akka" %% "akka-stream-experimental" % version + val http = "com.typesafe.akka" %% "akka-http-experimental" % version + val http_core = "com.typesafe.akka" %% "akka-http-core-experimental" % version + + // FIXME: currently this is an empty dependency - as soon as this situation changes, we need to ensure manually added files under `akka.stream.testkit.*` are deleted! + val testkit = "com.typesafe.akka" %% "akka-stream-testkit-experimental" % version } val leveldb = "org.iq80.leveldb" % "leveldb" % "0.7"