Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gesture Classifier #37

Merged
merged 34 commits into from
Jan 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fe35adc
Initial (faulty!) stab at building a gesture tokenizer - needs more w…
Jan 20, 2015
c440c79
Defined gesture event matching (against a sample window)
Jan 20, 2015
7c2df3f
Added in window matching stager (to identify matching gestures in a s…
Jan 20, 2015
dfdc856
sensorOut refactored to be argument to sensorTransformation
Jan 20, 2015
20bce9f
Refactored stage: renamed SamplingWindow; now correctly deals with te…
Jan 21, 2015
cc82e4a
Introduced ZipSet (finite set variant of Zip and ZipWith).
Jan 21, 2015
49f879e
Fixed issue with flushing staging buffers as the stream shutsdown.
Jan 21, 2015
1aeb54b
First stab at defining a gesture classification workflow
Jan 21, 2015
832ec43
Merge remote-tracking branch 'upstream/master' into svm
Jan 23, 2015
0425bce
Refactored code to utilise flowgraphs throughout (in an attempt to be…
Jan 23, 2015
d1856b2
Added in singleton factories to aid flowgraph construction
Jan 23, 2015
06827df
Merge remote-tracking branch 'upstream/master' into svm
Jan 24, 2015
64f9d99
Merge remote-tracking branch 'upstream/master' into svm
Jan 24, 2015
6776f2f
Partially complete re-wiring of flowgraphs
Jan 25, 2015
4849d74
Need to complete external wiring to GestureClassificationWorkflow
Jan 25, 2015
073d58a
Only external wiring needs to be sorted now!
Jan 25, 2015
ce4a76b
Minor format changes
Jan 25, 2015
5b58264
Correctly wired up now - yeah!
Jan 25, 2015
bd45005
Merge remote-tracking branch 'upstream/master' into svm
Jan 26, 2015
6198894
Gesture classification workflow completed.
Jan 26, 2015
3a03592
Refactored monolithic dev. code into a series of workflow package mem…
Jan 26, 2015
19e56ee
Added in akka stream testkit code
Jan 27, 2015
9f1d8fe
Added in SamplingWindow tests
Jan 27, 2015
ec0d3fb
SamplingWindow testing complete
Jan 27, 2015
94842af
Added in ZipSet tests
Jan 27, 2015
a0d3e6a
GroupBySample tests
Jan 27, 2015
617ede3
MergeTransformations tests added
Jan 27, 2015
b52be7b
Added in ModulateSensorNet tests
Jan 27, 2015
c94cca0
Simplified SamplingWindow to SlidingWindow
Jan 28, 2015
2e3f9de
Added in IdentifyGestureEvents testing
Jan 28, 2015
6b5e027
Partial attempt at gesture testing
Jan 28, 2015
7b10ea3
Generalised Zip flowgraph nodes: now have ZipN and ZipSet
Jan 29, 2015
689260d
Added in classifcation assertional language.
Jan 29, 2015
e56131b
Added in GestureClassification tests
Jan 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/exercise/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ libraryDependencies ++= Seq(
scalatest % "test",
scalacheck % "test",
akka.testkit % "test",
spray.testkit % "test"
spray.testkit % "test",
akka.streams.testkit % "test"
)
4 changes: 3 additions & 1 deletion server/exercise/src/main/resources/classification.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
classification {
frequency = 100 # Hz
model.path = "/models"

gesture {
tap {
model = "svm-model-tap-features"
size = 25
size = 25 # number of sampled events (seconds = size / frequency)
threshold = 0.80
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package com.eigengo.lift.exercise

import scodec.bits.{BitVector, ByteOrdering}

import scala.annotation.tailrec
import scalaz.\/

/**
* Accelerometer data groups ``values`` at the given ``samplingRate``
* @param samplingRate the sampling rate in Hz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[svm] trait ParserUtils extends StringBuilding {
def FractionalPart = rule {
optional('.' ~ oneOrMore(Digit)) ~ optional(ignoreCase('e') ~ optional(anyOf("+-")) ~ oneOrMore(Digit))
}

def Number: Rule1[String] = rule {
capture(optional(anyOf("+-")) ~ oneOrMore(Digit) ~ optional(FractionalPart)) ~> ((n: String) => push((Try(n.toInt) orElse Try(n.toDouble)).get.toString))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.eigengo.lift.exercise.classifiers.workflows

object ClassificationAssertions {

/**
* Facts that may hold of sensor data
*/
sealed trait Fact
case class Gesture(name: String, matchProbability: Double) extends Fact
case object Unknown extends Fact

/**
* Quantifier-free assertions that may hold of sensor data
*/
sealed trait Assertion
case class Predicate(fact: Fact) extends Assertion
case class Conjunction(assert1: Assertion, assert2: Assertion, assertRemaining: Assertion*) extends Assertion
case class Disjunction(assert1: Assertion, assert2: Assertion, assertRemaining: Assertion*) extends Assertion

/**
* Bind an assertion to a sensor data value. In doing this, assertion is true for that value.
*
* @param assertion assertion that is true for the sensor data value
* @param value sensor data that assertion holds for
*/
case class Bind[A](assertion: Assertion, value: A)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package com.eigengo.lift.exercise.classifiers.workflows

import akka.stream.scaladsl._
import breeze.linalg.DenseMatrix
import com.eigengo.lift.exercise.AccelerometerValue
import com.eigengo.lift.exercise.classifiers.svm.{SVMClassifier, SVMModelParser}
import com.typesafe.config.Config

/**
* Trait that implements reactive stream components that can:
*
* * [identifyGestureEvents] tap into sensor streams and trigger transformation events whenever a sample window is classified as a gesture
* * [mergeTransformations] merge collections of transformation events into a single transformation event
* * [modulateSensorNet] modulate the signals (e.g. by tagging them) in a network of sensors using a transformation signal
* * [gestureCollector] split tagged sensor streams using highest probability gesture matches
*/
// TODO: calling code needs to "normalise" sensor stream against the time dimension - i.e. here we assume that events occur with a known frequency (use `TickSource` as a driver for this?)
trait GestureWorkflows extends SVMClassifier {

import ClassificationAssertions._
import FlowGraphImplicits._

def name: String
def config: Config

def frequency = {
val value = config.getInt("classification.frequency")
assert(value > 0)
value
}
def threshold = {
val value = config.getDouble(s"classification.gesture.$name.threshold")
assert(0 <= value && value <= 1)
value
}
def windowSize = {
val value = config.getInt(s"classification.gesture.$name.size")
assert(value > 0)
value
}

// NOTE: here we accept throwing an exception in loading R libSVM models (since this indicates a catastrophic configuration error!)
private lazy val model = new SVMModelParser(name)(config).model.get

/**
* Measures probability that sampled window is recognised as a gesture event.
*
* @param sample sampled data to be tested
* @return probability that a gesture was recognised in the sample window
*/
private def probabilityOfGestureEvent(sample: List[AccelerometerValue]): Double = {
require(sample.length == windowSize)

val data = DenseMatrix(sample.map(v => (v.x.toDouble, v.y.toDouble, v.z.toDouble)): _*)

predict(model, data, taylor_radial_kernel()).positiveMatch
}

/**
* Flowgraph that taps the in-out stream and, if a gesture is recognised, sends a `Fact` message to the `tap` sink.
*/
class IdentifyGestureEvents {
val in = UndefinedSource[AccelerometerValue]
val out = UndefinedSink[AccelerometerValue]
val tap = UndefinedSink[Fact]

val graph = PartialFlowGraph { implicit builder =>
val split = Broadcast[AccelerometerValue]

in ~> split ~> out
split ~> Flow[AccelerometerValue].transform(() => SlidingWindow[AccelerometerValue](windowSize)).map { (sample: List[AccelerometerValue]) =>
if (sample.length == windowSize) {
// Saturated windows may be classified
val matchProbability = probabilityOfGestureEvent(sample)

if (matchProbability > threshold) {
Gesture(name, matchProbability)
} else {
Unknown
}
} else {
// Truncated windows are never classified (these typically occur when the stream closes)
Unknown
}
} ~> tap
}
}

object IdentifyGestureEvents {
def apply() = new IdentifyGestureEvents()
}

/**
* Flowgraph that merges (via a user supplied function) a collection of input sources into a single output sink.
*
* @param size number of input sources that we are to bundle and merge
*/
class MergeSignals[A, B](size: Int, merge: Set[A] => B) {
require(size > 0)

val in = (0 until size).map(_ => UndefinedSource[A])
val out = UndefinedSink[B]

val graph = PartialFlowGraph { implicit builder =>
// We separate out size 1 case since `ZipN` nodes need at least 2 upstream nodes
if (size == 1) {
in.head ~> Flow[A].map(v => merge(Set(v))) ~> out
} else {
val zip = ZipN[A](in.size)

for ((probe, index) <- in.zipWithIndex) {
probe ~> zip.in(index)
}
zip.out ~> Flow[Set[A]].map(merge) ~> out
}
}
}

object MergeSignals {
def apply[A, B](size: Int)(merge: Set[A] => B) = {
new MergeSignals[A, B](size, merge)
}
}

/**
* Flowgraph that modulates all sensors in a network of location tagged sensors. Messages on the `transform` source
* determine how signals in the sensor net are modulated or transformed.
*
* @param locations set of locations that make up the sensor network
*/
class ModulateSensorNet[A, B, L](locations: Set[L]) {
require(locations.nonEmpty)

val in = locations.map(loc => (loc, UndefinedSource[A])).toMap
val transform = UndefinedSource[Fact]
val out = locations.map(loc => (loc, UndefinedSink[Bind[A]])).toMap

val graph = PartialFlowGraph { implicit builder =>
// We separate out 1-element case since `Broadcast` nodes need at least 2 downstream nodes
if (locations.size == 1) {
val zip = ZipWith[A, Fact, Bind[A]]((msg: A, tag: Fact) => tag match {
case Gesture(name, matchProb) =>
Bind(Predicate(Gesture(name, matchProb)), msg)

case Unknown =>
Bind(Predicate(Unknown), msg)
})

in(locations.head) ~> zip.left
transform ~> zip.right
zip.out ~> out(locations.head)
} else {
val broadcast = Broadcast[Fact]

transform ~> broadcast
for ((location, sensor) <- in) {
val zip = ZipWith[A, Fact, Bind[A]]((msg: A, tag: Fact) => tag match {
case Gesture(name, matchProb) =>
Bind(Predicate(Gesture(name, matchProb)), msg)

case Unknown =>
Bind(Predicate(Unknown), msg)
})

sensor ~> zip.left
broadcast ~> zip.right
zip.out ~> out(location)
}
}
}
}

object ModulateSensorNet {
def apply[A, B, L](locations: Set[L]) = new ModulateSensorNet[A, B, L](locations)
}

/**
* Flowgraph that monitors the `inputLocations` sensor network for recognisable gestures. When gestures are detected,
* messages on the `outputLocations` sensor network are tagged and grouped.
*
* @param inputLocations locations that make up the input sensor network
* @param outputLocations locations that make up the output sensor network
*/
class GestureClassification[L](inputLocations: Set[L], outputLocations: Set[L]) {
require(inputLocations.nonEmpty)
require(outputLocations.nonEmpty)

private val identify = inputLocations.map(loc => (loc, IdentifyGestureEvents())).toMap
private val modulate = ModulateSensorNet[AccelerometerValue, Bind[AccelerometerValue], L](outputLocations)
private val merge = MergeSignals[Fact, Fact](inputLocations.size) { (obs: Set[Fact]) =>
require(obs.nonEmpty)

if (obs.filter(_.isInstanceOf[Gesture]).asInstanceOf[Set[Gesture]].filter(_.name == name).nonEmpty) {
obs.filter(_.isInstanceOf[Gesture]).asInstanceOf[Set[Gesture]].filter(_.name == name).maxBy(_.matchProbability)
} else {
obs.head
}
}

// Tapped sensors - monitored for recognisable gestures
val inputTap = inputLocations.map(loc => (loc, identify(loc).in)).toMap
val outputTap = inputLocations.map(loc => (loc, identify(loc).out)).toMap

// Modulation sensors - binds sensor data with gesture facts
val inputModulate = outputLocations.map(loc => (loc, modulate.in(loc))).toMap
val outputModulate = outputLocations.map(loc => (loc, modulate.out(loc))).toMap

val graph = PartialFlowGraph { implicit builder =>
builder.importPartialFlowGraph(merge.graph)

// Wire in tapped sensors
for ((loc, index) <- inputLocations.zipWithIndex) {
builder.importPartialFlowGraph(identify(loc).graph)
builder.connect(identify(loc).tap, Flow[Fact], merge.in(index))
}

// Wire in modulation
builder.importPartialFlowGraph(modulate.graph)
builder.connect(merge.out, Flow[Fact], modulate.transform)
}
}

object GestureClassification {
def apply[L](inputLocations: Set[L], outputLocations: Set[L]) = new GestureClassification[L](inputLocations, outputLocations)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.eigengo.lift.exercise.classifiers.workflows

import akka.stream.stage.{TerminationDirective, Directive, Context, PushPullStage}
import scala.collection.mutable

/**
* Streaming stage that buffers events and slides a window over streaming input data. Transmits each observed window
* downstream.
*
* @param size size of the internal buffer and so the sliding window size
*/
class SlidingWindow[A] private (size: Int) extends PushPullStage[A, List[A]] {
require(size > 0)

private val buffer = mutable.Queue[A]()
private var isSaturated = false

override def onPush(elem: A, ctx: Context[List[A]]): Directive = {
if (buffer.length == size) {
// Buffer is full, so push new window
buffer.dequeue()
buffer.enqueue(elem)
ctx.push(buffer.toList)
} else {
// Buffer is not yet full, so keep consuming from our upstream
buffer.enqueue(elem)
if (buffer.length == size) {
// Buffer has become full, so push new window and record saturation
isSaturated = true
ctx.push(buffer.toList)
} else {
ctx.pull()
}
}
}

override def onPull(ctx: Context[List[A]]): Directive = {
if (ctx.isFinishing) {
// Streaming stage is shutting down, so we ensure that all buffer elements are flushed prior to finishing
if (buffer.isEmpty) {
// Buffer is empty, so we simply finish
ctx.finish()
} else if (buffer.length == 1) {
// Buffer is non-empty, so empty it by sending undersized (non-empty) truncated window sequence and finish
if (isSaturated) {
// Buffer was previously saturated, so head element has already been seen
buffer.dequeue()
ctx.finish()
} else {
// Buffer was never saturated, so head element needs to be pushed
ctx.pushAndFinish(List(buffer.dequeue()))
}
} else {
// Buffer is non-empty, so empty it by sending undersized (non-empty) truncated window sequence - we will eventually finish here
if (isSaturated) {
// Buffer was previously saturated, so head element has already been seen
buffer.dequeue()
ctx.push(buffer.toList)
} else {
// Buffer was never saturated, so head element should be part of truncated window
val window = buffer.toList
buffer.dequeue()
ctx.push(window)
}
}
} else {
ctx.pull()
}
}

override def onUpstreamFinish(ctx: Context[List[A]]): TerminationDirective = {
ctx.absorbTermination()
}
}

object SlidingWindow {
def apply[A](size: Int): SlidingWindow[A] = {
new SlidingWindow(size)
}
}
Loading