These are my notes and study guide how I approach studying Principles of Reactive Programming
from Coursera.
The basic principle of reactive programming is:
Reacting to sequence of events that happen in time
, and, using these patterns to, build software systems that are more robust, more resilient, more flexible and better positioned to meet modern demands. -- Reactive Manifesto
In computing, reactive programming is a
programming paradigm oriented around data flows and the propagation of change
. This means that it should be possible to express static or dynamic data flows with ease in the programming languages used, and that the underlying execution model will automatically propagate changes through the data flow. -- Wikipedia
- Youtube: Erik Meijer - What does it mean to be Reactive?
- Youtube: Dr. Roland Kuhn - Go Reactive at the Trivento Summercamp
- Slideware: Lutz HĂĽhnken - A Pragmatic View of Reactive
- Slideware: Jonas Boner - Life Beyond the illusion of present
- Slideware: Takipi - Advanced Production Debugging
- Mathias - Reactive Streams: The Now
Supervision
describes a dependency relationship between actors: the supervisor delegates tasks to subordinates and therefore must respond to their failures. -- Akka.io - Supervision and Monitoring
Akka persistence
enablesstateful actors
to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by asupervisor
, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor's internal state are persisted but never its current state directly (except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can be either the full history of changes or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point communication withat-least-once message delivery
semantics. -- Akka.io - Persistence
Akka persistence uses persistence plugins to store the event
journal entries or snapshot
'current-state' entries into a persistence store or log. There are a lot of plugins available on the Akka Community Projects site, the most notable ones are:
- GitHub - akka-persistence-jdbc by myself
- GitHub - akka-persistence-inmemory by myself - Handy for testing purposes
- GitHub - akka-persistence-cassandra by Martin Krasser
- GitHub - akka-persistence-kafka by Martin Krasser
- Parleys - Heiko Seeberger - Akka Overview and State Machines
- Parleys - Nicolas Jozwiak - Supervise your Akka actors
- Parleys - Konrad Malawski - Resilient applications with Akka Persistence
- Youtube - Intro to Akka persistence with Patrik Nordwall
Actors are very lightweight concurrent entities. They process messages asynchronously using an event-driven receive loop. Pattern matching against messages is a convenient way to express an actor's behavior. They raise the abstraction level and make it much easier to write, test, understand and maintain concurrent and/or distributed systems. You focus on workflow—how the messages flow in the system—instead of low level primitives like threads, locks and socket IO. -- Akka.io
- Youtube - Up, Up, and Out: Scaling Software with Akka
- Youtube - Up And Out Scaling Software With Akka - Jonas Bonér
- Youtube - Akka 2.0: Scaling Up & Out With Actors
- Youtube - Above the Clouds: Introducing Akka
- Youtube - Deep Dive into the Typesafe Reactive Platform - Akka and Scala - with Nilanjan Raychaudhuri
The Stash trait enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking context.become
or context.unbecome
, all stashed messages can be "unstashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. To stash messages call stash()
, to unstash call unstashAll()
. Invoking stash()
adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases.
Use the Stash
trait instead of the Queue
in the example.
To send messages, akka knows the following patterns, tell, ask and forward
Only use tell (fire and forget)
, you don't need the two other patterns.
The BinaryTreeSet
class:
class BinaryTreeSet extends Actor with ActorLogging with Stash {
import BinaryTreeNode._
import BinaryTreeSet._
var counter = 0
def createRoot: ActorRef = {
counter += 1
context.actorOf(BinaryTreeNode.props(0, initiallyRemoved = true), "rt-" + counter)
}
var root = createRoot
// optional
def receive = normal
// optional
/** Accepts `Operation` and `GC` messages. */
val normal: Receive = {
case msg: Insert =>
root ! msg
case msg: Contains =>
root ! msg
case msg: Remove =>
root ! msg
case GC =>
log.info("Receiving GC, creating newRoot and becoming garbageCollecting")
val newRoot = createRoot
root ! CopyTo(newRoot)
context.become(garbageCollecting(newRoot))
}
// optional
/** Handles messages while garbage collection is performed.
* `newRoot` is the root of the new binary tree where we want to copy
* all non-removed elements into.
*/
def garbageCollecting(newRoot: ActorRef): Receive = LoggingReceive {
case o: Operation =>
log.info("(Enqueue): {}", o)
stash()
case CopyFinished =>
log.info("(CopyFinished): destroying old tree nodes and replacing `root` with `newRoot` and replaying pending queue, and becoming normal")
root ! PoisonPill
root = newRoot
unstashAll()
context.become(normal)
}
}
The BinaryTreeNode
class:
class BinaryTreeNode(val elem: Int, initiallyRemoved: Boolean) extends Actor with ActorLogging {
import BinaryTreeNode._
import BinaryTreeSet._
// initiallyRemoved, is removed, will be GC'ed
def node(value: Int) = context.actorOf(Props(new BinaryTreeNode(value, initiallyRemoved = false)), s"$value")
var subtrees = Map[Position, ActorRef]()
var removed = initiallyRemoved
// optional
def receive = normal
def nodeInfo: String = s"elem: [$elem], initiallyRemoved: [$initiallyRemoved], removed: [$removed], subtrees: [$subtrees]"
// optional
/** Handles `Operation` messages and `CopyTo` requests. */
val normal: Receive = LoggingReceive {
// contains
case msg @ Contains(requester, id, e) if e == elem && removed =>
log.debug("(Contains): {}, returning ContainsResult({}, false)", nodeInfo, id)
requester ! ContainsResult(id, result = false)
case msg @ Contains(requester, id, e) if e == elem && !removed =>
log.debug("(Contains): {} == {}, not removed, returning ContainsResult({}, true)", e, elem, id)
requester ! ContainsResult(id, result = true)
case msg @ Contains(requester, id, e) if e < elem && subtrees.get(Left).isEmpty =>
log.debug("(Contains): Left is empty: Subtrees: {}, returning ContainsResult({}, false)", subtrees, id)
requester ! ContainsResult(id, result = false)
case msg @ Contains(_, _, e) if e < elem && subtrees.get(Left).nonEmpty =>
log.debug("(Contains): Left is nonEmpty, sending message: {}", msg)
subtrees.get(Left).foreach (_ ! msg)
case msg @ Contains(requester, id, e) if e > elem && subtrees.get(Right).isEmpty =>
log.debug("(Contains): Right is empty: Subtrees: {}, returning ContainsResult({}, false)", subtrees, id)
requester ! ContainsResult(id, result = false)
case msg @ Contains(_, _, e) if e > elem && subtrees.get(Right).nonEmpty =>
log.debug("(Contains): Right is nonEmpty, sending message: {}", msg)
subtrees.get(Right).foreach (_ ! msg)
// insert
case msg @ Insert(requester, id, e) if e == elem =>
log.debug("(Insert): {} == {}, returning OperationFinished({})", e, elem, id)
removed = false
requester ! OperationFinished(id)
case msg @ Insert(_, _, e) if e < elem && subtrees.get(Left).isEmpty =>
log.debug("(Insert): Left is empty, creating node and sending message: {}", msg)
val leftNode = node(e)
leftNode ! msg
subtrees += Left -> leftNode
case msg @ Insert(_, _, e) if e < elem && subtrees.get(Left).nonEmpty =>
log.info("(Insert): Left is nonEmpty, sending message to node")
subtrees.get(Left).foreach (_ ! msg)
case msg @ Insert(_, _, e) if e > elem && subtrees.get(Right).isEmpty =>
log.debug("(Insert): Right is empty, creating node and sending message: {}", msg)
val rightNode = node(e)
rightNode ! msg
subtrees += Right -> rightNode
case msg @ Insert(_, _, e) if e > elem && subtrees.get(Right).nonEmpty =>
log.info("(Insert): Right is nonEmpty, sending message to node")
subtrees.get(Right).foreach (_ ! msg)
// Remove
case msg @ Remove(requester, id, e) if e == elem =>
log.debug("(Remove): {} == {}, returning OperationFinished({})", e, elem, id)
removed = true
requester ! OperationFinished(id)
case msg @ Remove(requester, id, e) if e < elem && subtrees.get(Left).isEmpty =>
log.info("(Remove): Left isEmpty, returning OperationFinished({})", id)
requester ! OperationFinished(id)
case msg @ Remove(_, _, e) if e < elem && subtrees.get(Left).nonEmpty =>
log.info("(Remove): Left is nonEmpty, sending message: {}", msg)
subtrees.get(Left).foreach (_ ! msg)
case msg @ Remove(requester, id, e) if e > elem && subtrees.get(Right).isEmpty =>
log.info("(Remove): Right isEmpty, returning OperationFinished({})", id)
requester ! OperationFinished(id)
case msg @ Remove(_, _, e) if e > elem && subtrees.get(Right).nonEmpty =>
log.info("(Remove): Right is nonEmpty, sending message: {}", msg)
subtrees.get(Right).foreach (_ ! msg)
// CopyTo
case CopyTo(_) if removed && subtrees.isEmpty =>
log.info("(CopyTo): {}, node is removed and substrees are empty, becoming copied and sending OperationFinished to self", nodeInfo)
context.become(copying(subtrees.values.toSet, insertConfirmed = false))
self ! OperationFinished(elem)
case CopyTo(treeNode) if !removed && subtrees.isEmpty =>
log.info("(CopyTo): {}, node is not removed and substrees are empty, becoming copyTo and sending INSERT to newTree", nodeInfo)
treeNode ! Insert(self, 1, elem)
context.become(copying(subtrees.values.toSet, insertConfirmed = false))
case CopyTo(treeNode) if removed && subtrees.isEmpty =>
log.info("(CopyTo): {}, node is removed and substrees are empty, returning CopyFinished", nodeInfo)
treeNode ! Insert(self, 1, elem)
context.become(copying(subtrees.values.toSet, insertConfirmed = false))
case msg @ CopyTo(treeNode) if removed && subtrees.nonEmpty =>
val nodes: Set[ActorRef] = subtrees.values.toSet
nodes.foreach (_ ! msg)
context.become(copying(nodes, insertConfirmed = true))
log.info("(CopyTo): {} node *is removed*, and subtrees is nonEmpty, becoming copying({}, true) ", nodeInfo, nodes)
case msg @ CopyTo(treeNode) if !removed && subtrees.nonEmpty =>
treeNode ! Insert(self, 1, elem)
val nodes: Set[ActorRef] = subtrees.values.toSet
nodes.foreach (_ ! msg)
context.become(copying(nodes, insertConfirmed = false))
log.info("(CopyTo): {} node is not removed, and subtrees is nonEmpty, becoming copying({}, false) ", nodeInfo, nodes)
case msg =>
log.info("Dropping msg: {}, info: {}", msg, nodeInfo)
}
// optional
/** `expected` is the set of ActorRefs whose replies we are waiting for,
* `insertConfirmed` tracks whether the copy of this node to the new tree has been confirmed.
*/
def copying(expected: Set[ActorRef], insertConfirmed: Boolean): Receive = LoggingReceive {
case OperationFinished(_) if expected.isEmpty =>
log.info("(OperationFinished): {}, I've been copied, children are finished, returning CopyFinished to parent and stopping self", nodeInfo)
context.parent ! CopyFinished
context.stop(self)
case OperationFinished(_) if expected.nonEmpty =>
log.info("(OperationFinished): {}, I've been copied, waiting on children, becoming copying({}, true)", nodeInfo)
context.become(copying(expected, insertConfirmed = true))
case CopyFinished if expected.isEmpty && insertConfirmed =>
log.info("(CopyFinished): child finished, no more children and I've been copied, returning CopyFinished to parent and stopping self")
context.parent ! CopyFinished
context.stop(self)
case CopyFinished if expected.nonEmpty =>
val newExpected = expected.filterNot(_ == sender())
context.become(copying(newExpected, insertConfirmed))
if(newExpected.isEmpty) {
log.info("(CopyFinished): all children are finished, returning CopyFinished to parent and stopping self")
context.parent ! CopyFinished
context.stop(self)
} else {
log.info("(CopyFinished): child finished, waiting for another child, becoming copying({}, {})", newExpected, insertConfirmed)
}
case CopyFinished if expected.isEmpty && !insertConfirmed =>
log.info("(CopyFinished): children are finished, but I'm not copied yet, becoming copying({}, {})", expected, insertConfirmed)
context.become(copying(expected, insertConfirmed))
}
}
Users expect real time data. They want their tweets now. Their order confirmed now. They need prices accurate as of now. Their online games need to be responsive. As a developer, you demand fire-and-forget messaging. You don't want to be blocked waiting for a result. You want to have the result
pushed
to you when it is ready. Even better, when working with result sets, you want to receive individual results as they are ready. You do not want to wait for the entire set to be processed before you see the first row. The world has moved topush
; users are waiting for us to catch up. Developers have tools topush
data, this is easy. Developers need tools toreact to push data
. -- Introduction to Rx
Rx offers a natural paradigm for dealing with sequences of events. A sequence can contain zero or more events. Rx proves to be most valuable when
composing sequences of events
. -- Introduction to Rx
You can think of Rx as providing an API similar to Java 8 / Groovy / Scala collections (methods like filter, forEach, map, reduce, zip etc) - but which operates on an asynchronous stream of events rather than a collection. So you could think of Rx as like working with asynchronous
push-based
collections (rather than the traditional synchronous pull based collections). -- Camel Rx
Please note that Rx focusses on push-based
events. There is no way for the network to go from a push-based
model to a pull-based
model like with Reactive Streams, because the network has no notion of an upstream (the demand stream), in which the subscriber
communicates its demand for data to the publisher
. With Rx there is only a downwards stream, in which the publisher
pushes the data-items to the subscribers
. The RxJavaReactiveStreams project makes Rx compatible with Reactive Streams.
- A Playful Introduction to Rx - Erik Meijer
- RxJava: Reactive Extensions in Scala
- Ben Christensen - "Functional Reactive Programming with RxJava
- DevCamp 2010 Keynote - Rx: Curing your asynchronous programming blues
- Channel 9 - Rx Workshop: Introduction
- An Event-driven and Reactive Future - Jonathan Worthington
- Rx - Operators Reference
- RxJava <-> RxScala API
- ReactiveX - Portal
- RxScala
- RxJava
- RxJava Wiki
- Microsoft Open Technologies - Rx
- The Rx Observable
- Reactive Programming in the Netflix API with RxJava
- Lee Campbell - Reactive Extensions for .NET an Introduction
- MSDN - The Reactive Extensions (Rx)...
The textValues and clicks observables:
def textValues: Observable[String] =
Observable[String]({ subscriber =>
val eventHandler: PartialFunction[Event, Unit] = {
case ValueChanged(source) =>
subscriber.onNext(source.text)
}
field.subscribe(eventHandler)
Subscription {
field.unsubscribe(eventHandler)
}
})
def clicks: Observable[Button] =
Observable[Button] ({ subscriber =>
val eventHandler: PartialFunction[Event, Unit] = {
case ButtonClicked(source) =>
subscriber.onNext(source)
}
button.subscribe(eventHandler)
Subscription {
button.unsubscribe(eventHandler)
}
})
Wikipedia API:
def sanitized: Observable[String] =
obs.map(_.replaceAll(" ", "_"))
def recovered: Observable[Try[T]] =
obs.map(Try(_)).onErrorReturn(Failure(_))
def timedOut(totalSec: Long): Observable[T] =
obs.take(totalSec.seconds)
def concatRecovered[S](requestMethod: T => Observable[S]): Observable[Try[S]] =
obs.flatMap(requestMethod(_).recovered)
WikipediaSuggest.scala
val searchTerms: Observable[String] =
searchTermField.textValues
val suggestions: Observable[Try[List[String]]] =
searchTerms.flatMap(wikiSuggestResponseStream).recovered
val suggestionSubscription: Subscription =
suggestions.observeOn(eventScheduler).subscribe { (x: Try[List[String]]) =>
x.map(suggestionList.listData = _)
.recover { case t: Throwable =>
status.text = t.getLocalizedMessage
}
}
val selections: Observable[String] =
button.clicks.flatMap { _ =>
suggestionList.selection.items.self match {
case seq: Seq[String] if seq.nonEmpty =>
Observable.just(seq.head)
case _ =>
Observable.empty
}
}
val pages: Observable[Try[String]] =
selections.flatMap(wikiPageResponseStream).recovered
val pageSubscription: Subscription =
pages.observeOn(eventScheduler) subscribe { (x: Try[String]) =>
x.map (editorpane.text = _)
.recover { case t: Throwable =>
status.text = t.getLocalizedMessage
}
}
}
Futures provide a nice way to reason about performing many operations in parallel– in an efficient and non-blocking way. The idea is simple, a Future is a sort of a placeholder object that you can create for a result that does not yet exist. Generally, the result of the Future is computed concurrently and can be later collected. Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code. -- ScalaDocs
- Scala - Futures
- Akka - Futures
- The Neophyte's Guide to Scala Part 8 - Welcome to the Future
- The Neophyte's Guide to Scala Part 9 - Promises and Futures in Practice
- Ian Irvine - Notes (2014) - Monads and Effects
- Ian Irvine - Notes (2014) - Latency as an Effects
- Ian Irvine - Notes (2014) - Combinators on Futures
- Ian Irvine - Notes (2014) - Composing Futures
- Ian Irvine - Notes (2014) - Promises
Most combinators are explained in the videos by Eric Meijer. Please view these videos again and implement them in the nodescala package object. Some combinators are already available in the Future object itself, so if you are lazy, you can reuse those.
Does a Promise[T]().future
complete?
The timeout looks a whole lot like the userInterrupted
future structure
Reuse the future userInterrupted
and timeout
. When any
of those fail, the terminatedRequested
future should fail.
Note that to cancel a Future, you should use the val subscription: Subscription = Future.run() { (ct: CancellationToken) => }
future construct.
The Future.run() { ct => }
construct returns a Subscription
that can be used to unsubscribe
from. When you call
subscription.unsubscribe
, the CancellationToken
, that is available in the curried function (the context
if you will),
that contains the members isCancelled: Boolean
and nonCancelled = !isCancelled
properties, can be queried to figure out
whether or not the future has been canceled.
So the question remains, how does one unsubscribe
, the thereby cancel all requests that the server handles, when a
subscription
is in scope?
The respond
method, that will be called by the start
method (you should implement both), will stream the result back
using the exchange
's write
method.
In a loop, you should check whether or not the token
has been canceled.
In a loop, you should check whether or not the response
has more Strings; it is an Iterator.
After you're done writing to the exchange
, please close the stream using exchange.close()
The solution is:
private def respond(exchange: Exchange, token: CancellationToken, response: Response): Unit = {
while(response.hasNext && token.nonCancelled) {
exchange.write(response.next())
}
exchange.close()
}
Eric Meijer likes the async-await
construct, because you can use imperative constructs together with async constructs
but still being non-blocking. The while(ct.nonCanceled)
'problem' makes it imperative.
You should first create a listener
, then start
the listener, which returns a Subscription
. Then you should
create a cancellable context using the Future.run() {}
construct. You should then loop
while the context is nonCanceled,
then you should wait for a nextRequest from the listener, then you should respond
Return a combined Subscription
with the Subscription(subscription1, subscription2)
construct.
You can respond by applying the handler
with the request
, which gives you a Response
, and calling the respond
method.
The solution is:
def start(relativePath: String)(handler: Request => Response): Subscription = {
val listener = createListener(relativePath)
val listenerSubscription: Subscription = listener.start()
val requestSubscription: Subscription = Future.run() { (ct: CancellationToken) =>
async {
while (ct.nonCancelled) {
val (req, exch) = await (listener.nextRequest())
respond(exch, ct, handler(req))
}
}
}
Subscription(listenerSubscription, requestSubscription)
}
You could probably rewrite the solution to:
def start(relativePath: String)(handler: Request => Response): Subscription = {
val listener = createListener(relativePath)
val listenerSubscription: Subscription = listener.start()
val requestSubscription: Subscription = Future.run() { (ct: CancellationToken) =>
Future {
while (ct.nonCancelled) {
Await.result(listener.nextRequest().map { req =>
respond(req._2, ct, handler(req._1))
}, Duration.Inf)
}
}
}
Subscription(listenerSubscription, requestSubscription)
}
or
def start(relativePath: String)(handler: Request => Response): Subscription = {
val listener = createListener(relativePath)
val listenerSubscription: Subscription = listener.start()
val requestSubscription: Subscription = Future.run() { (ct: CancellationToken) =>
Future {
while (ct.nonCancelled) {
Await.result(listener.nextRequest().map {
case (req: Request, exch: Exchange ) => respond(exch, ct, handler(req))
}, Duration.Inf)
}
}
}
Subscription(listenerSubscription, requestSubscription)
}
Functional reactive programming (FRP) is a programming paradigm for reactive programming (asynchronous dataflow programming) using the building blocks of functional programming (e.g. map, reduce, filter). FRP has been used for programming graphical user interfaces (GUIs), robotics, and music, aiming to simplify these problems by explicitly > modeling time. -- Wikipedia
I would advice reading / viewing the resources below to get a good idea on what Functional Reactive Programming is. The model we use this week is push based, in which systems take events and push them through a 'signal' network to achieve a result. The basic idea of FRP that we focus on this week is that events are combined into 'signals' that always have a current value, but change discretely. The changes are event-driven. But instead of having an event handler that returns Unit, (like the onClick handler and such), it returns a value.
FRP in a nutshell (for now at least):
When we do an assignment in Scala, the following happens:
scala> var a = 1
a: Int = 1
scala> var b = 2
b: Int = 2
scala> var c = a + b
c: Int = 3
scala> a = 2
a: Int = 2
scala> c
res1: Int = 3
scala> var c = a + b
c: Int = 4
As we can see, the value of c
did not change, when we changed the value of a
from 1
to 2
. This is normal behavior
because we have expressed the relationship at one point in the execution of the program.
But what if, c
would change when we changed the value of a dependent value like a
. This would mean that there is a
dependency
created between c
, a
and b
that expresses how these values will relate over time. So the basic idea is
that c
will change when we change either a
and/or b
.
The following should work:
def tweetRemainingCharsCount(tweetText: Signal[String]): Signal[Int] =
Signal(MaxTweetLength - tweetLength(tweetText()))
def colorForRemainingCharsCount(remainingCharsCount: Signal[Int]): Signal[String] =
Signal {
remainingCharsCount() match {
case count if (0 to 14).contains(count) => "orange"
case count if count < 0 => "red"
case _ => "green"
}
}
Please first try it yourself, then if you wish, verify.
def computeDelta(a: Signal[Double], b: Signal[Double], c: Signal[Double]): Signal[Double] =
Signal {
Math.pow(b(), 2) - (4 * a() * c())
}
def computeSolutions(a: Signal[Double], b: Signal[Double], c: Signal[Double], delta: Signal[Double]): Signal[Set[Double]] =
Signal {
delta() match {
case discriminant if discriminant < 0 => Set()
case discriminant if discriminant == 0 => Set(calcLeft(a(), b(), c()))
case discriminant => Set(calcLeft(a(), b(), c()), calcRight(a(), b(), c()))
}
}
def calcLeft(a: Double, b: Double, c: Double): Double =
(-1 * b + Math.sqrt(Math.pow(b, 2) - (4 * a * c))) / (2 * a)
def calcRight(a: Double, b: Double, c: Double): Double =
(-1 * b - Math.sqrt(Math.pow(b, 2) - (4 * a * c))) / (2 * a)
Please first try it yourself, then if you wish, verify.
def computeValues(namedExpressions: Map[String, Signal[Expr]]): Map[String, Signal[Double]] = {
namedExpressions.mapValues { expr =>
Signal(eval(expr(), namedExpressions))
}
}
def eval(expr: Expr, references: Map[String, Signal[Expr]]): Double = {
expr match {
case Literal(v) => v
case Ref(name) => eval(getReferenceExpr(name, references), references - name)
case Plus(aExpr, bExpr) => eval(aExpr, references) + eval(bExpr, references)
case Minus(aExpr, bExpr) => eval(aExpr, references) - eval(bExpr, references)
case Times(aExpr, bExpr) => eval(aExpr, references) * eval(bExpr, references)
case Divide(aExpr, bExpr) => eval(aExpr, references) / eval(bExpr, references)
case _ => Double.MaxValue
}
}
/** Get the Expr for a referenced variables.
* If the variable is not known, returns a literal NaN.
*/
private def getReferenceExpr(name: String, references: Map[String, Signal[Expr]]): Expr = {
references.get(name).fold[Expr](Literal(Double.NaN)) {
exprSignal => exprSignal()
}
}
- What is the difference between view, stream and iterator?
- Wikipedia - Functional Reactive Programming
- Functional Reactive Animation - Elliott / Hudak (PDF)
- Deprecating the Observer Pattern - Odersky / Maier (PDF)
- Stackoverflow - What happened to scala.react?
- Reactive Design Patterns - Kuhn - Chapter 1 (PDF)
- Functional Reactive Programming - Blackheath - Chapter 1 (PDF)
- Reactive Web Applications with Play - Bernhardt - Chapter 1 (PDF)
- Reactive Application Development - Devore - Chapter 1 (PDF)
- Functional and Reactive Domain Modeling - Ghosh - Chapter 1 (PDF)
- An Introduction to Functional Reactive Programming
- Functional Reactive Programming in Elm - Evan Czaplicki
- Building Reactive Apps - James Ward
- What does invariant mean?
- The Neophyte's Guide to Scala Part 12 - Type Classes
- Learn yourself Haskell - Functors, Applicative Functors and Monoids
- Learn yourself Haskell - A fistful of Monads
- Brian Beckman - Don't fear the Monad
- What is a Priority Queue?
- What is a Binary Heap?
- Algorithms with Attitude - Introduction to Binary Heaps
- Algorithms with Attitude - Binary Heaps for Priority Queues
- Algorithms with Attitude - Optimized Heapify
- Algorithms with Attitude - HeapSort
- Algorithms with Attitude - Lineair Time BuildHeap
- Typeclasses in Scala with Dan Rosen
- Vladimir Kostyukov's Scalacaster: algorithms and data structures in Scala
For testing, you want to insert two values into the heap
, for example with:
forAll { (x: Int, y: Int) =>
}
When you add the values into the heap and search for the minimum findMin
, it would be handy
to know whether x
or y
is the smallest, the following will help:
scala> def order = scala.math.Ordering.Int
order: math.Ordering.Int.type
scala> order.min(2,1)
res0: Int = 1
The heap has the method ord
that does the same.
To generate a heap, you can use the following code:
lazy val genHeap: Gen[H] = for {
n <- arbitrary[Int]
h <- oneOf(empty, genHeap)
} yield insert(n, h)
Did you notice the following? We have three methods, isEmpty
, findMin
and deleteMin
. When you combine these
methods you can iterate
over the heap until its empty and put the contents into a list:
def heapToList(h: H): List[Int] =
if(isEmpty(h)) Nil else findMin(h) :: heapToList(deleteMin(h))
Lists can be sorted:
scala> val xs = List(2,3,1,5,6,2,3,4,0,1,2)
xs: List[Int] = List(2, 3, 1, 5, 6, 2, 3, 4, 0, 1, 2)
scala> xs.sorted
res0: List[Int] = List(0, 1, 1, 2, 2, 2, 3, 3, 4, 5, 6)
Lists can also be compared:
scala> List(1, 2) == List(1, 2)
res0: Boolean = true
scala> List(1, 2) == List(2, 1)
res1: Boolean = false
scala> List(1, 2) == List(2, 1).sorted
res2: Boolean = true
scala> List(2, 1).sorted == List(2, 1).sorted
res3: Boolean = true
Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable. -- Hystrix - GitHub
Applications in complex distributed architectures have dozens of dependencies, each of which will inevitably fail at some point. If the host application is not isolated from these external failures, it risks being taken down with them. -- Hystrix Wiki
Hystrix is not about Futures and Promises, it is about bulk-heading and isolating dependencies by limiting concurrent execution, circuit breakers, real time monitoring and metrics. Futures are just a mechanism by which async execution is exposed. Futures by themselves do not provide the same degree of fault-tolerance functionality (though parts of it can be achieved with careful use of timeouts and thread-pool sizing). You could think of Hystrix as a hardened extension of a Future. -- Google Groups
Note; it basically focusses on the Resilient part of Reactive applications.
- Netflix - Introducing Hystrix for Resilience Engineering
- Enonic - Resilience with Hystrix
- Ben Christensen - Application Resilience in a Service-Oriented Architecture using Hystrix
- Ben Christensen - Application Resilience Engineering and Operations at Netflix with Hystrix - JavaOne 2013
On 2015-04-28 there were 23,200
students enrolled, which is roughly 12,000
less than last time (2014 edition).
- Scalaz - Scalaz a Scala library for functional programming.
- Learning Scalaz