Skip to content

Commit

Permalink
Merge pull request mDialog#37 from gramk/watcher-fix
Browse files Browse the repository at this point in the history
Add the ability to add subscribers after brando creation. Add scalariform
  • Loading branch information
chrisdinn committed Sep 24, 2014
2 parents dd2edd5 + 08a4e88 commit 1c3dff0
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 73 deletions.
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
8 changes: 8 additions & 0 deletions scalariform.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import scalariform.formatter.preferences._

scalariformSettings

ScalariformKeys.preferences := (FormattingPreferences().
setPreference(RewriteArrowSymbols, true).
setPreference(DoubleIndentClassDeclaration, true).
setPreference(AlignSingleLineCaseStatements, true))
30 changes: 18 additions & 12 deletions src/main/scala/Brando.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,24 @@ case object Connected extends BrandoStateChange
case object AuthenticationFailed extends BrandoStateChange
case object ConnectionFailed extends BrandoStateChange



private class Connection(
brando: ActorRef,
address: InetSocketAddress,
connectionRetry: Long,
maxConnectionAttempts: Option[Int]) extends Actor with ReplyParser {
import context.dispatcher

object BlockingRequest{
def unapply(r:Request) : Option[Request] =
r.command.utf8String.toLowerCase match{
case "subscribe" => Some(r)
case "blpop" => Some(r)
case "brpop" => Some(r)
case "brpoplpush" => Some(r)
case _ => None
object BlockingRequest {
def unapply(r: Request): Option[Request] =
r.command.utf8String.toLowerCase match {
case "subscribe" Some(r)
case "blpop" Some(r)
case "brpop" Some(r)
case "brpoplpush" Some(r)
case _ None
}
}

var socket: ActorRef = _

val requesterQueue = mutable.Queue.empty[ActorRef]
Expand Down Expand Up @@ -153,10 +151,13 @@ class Brando(
def receive = disconnected orElse cleanListeners

def authenticated: Receive = {
case request: Request connection forward request
case request: Request
connection forward request
case x: Tcp.ConnectionClosed
notifyStateChange(Disconnected)
context.become(disconnected orElse cleanListeners)
case s: ActorRef
listeners = listeners + s
}

def disconnected: Receive = {
Expand All @@ -178,6 +179,9 @@ class Brando(

case ConnectionFailed
notifyStateChange(ConnectionFailed)

case s: ActorRef
listeners = listeners + s
}

def authenticating: Receive = {
Expand All @@ -195,6 +199,8 @@ class Brando(
case AuthenticationFailed
notifyStateChange(AuthenticationFailed)

case s: ActorRef
listeners = listeners + s
}

def cleanListeners: Receive = {
Expand Down
134 changes: 73 additions & 61 deletions src/test/scala/BrandoTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -299,84 +299,84 @@ class BrandoTest extends TestKit(ActorSystem("BrandoTest")) with FunSpecLike

}

describe("blocking requests"){
describe("subscribe") {
describe("blocking requests") {
describe("subscribe") {

it("should be able to subscribe to a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Brando())
it("should be able to subscribe to a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Brando())

subscriber ! Request("SUBSCRIBE", channel)
subscriber ! Request("SUBSCRIBE", channel)

expectMsg(Some(List(Some(
ByteString("subscribe")),
Some(ByteString(channel)),
Some(1))))
}
expectMsg(Some(List(Some(
ByteString("subscribe")),
Some(ByteString(channel)),
Some(1))))
}

it("should receive published messages from a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Brando())
val publisher = system.actorOf(Brando())
it("should receive published messages from a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Brando())
val publisher = system.actorOf(Brando())

subscriber ! Request("SUBSCRIBE", channel)
subscriber ! Request("SUBSCRIBE", channel)

expectMsg(Some(List(Some(
ByteString("subscribe")),
Some(ByteString(channel)),
Some(1))))
expectMsg(Some(List(Some(
ByteString("subscribe")),
Some(ByteString(channel)),
Some(1))))

publisher ! Request("PUBLISH", channel, "test")
expectMsg(Some(1)) //publisher gets back number of subscribers when publishing
publisher ! Request("PUBLISH", channel, "test")
expectMsg(Some(1)) //publisher gets back number of subscribers when publishing

expectMsg(PubSubMessage(channel, "test"))
}
expectMsg(PubSubMessage(channel, "test"))
}

it("should be able to unsubscribe from a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Brando())
val publisher = system.actorOf(Brando())
it("should be able to unsubscribe from a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Brando())
val publisher = system.actorOf(Brando())

subscriber ! Request("SUBSCRIBE", channel)
subscriber ! Request("SUBSCRIBE", channel)

expectMsg(Some(List(Some(
ByteString("subscribe")),
Some(ByteString(channel)),
Some(1))))
expectMsg(Some(List(Some(
ByteString("subscribe")),
Some(ByteString(channel)),
Some(1))))

subscriber ! Request("UNSUBSCRIBE", channel)
subscriber ! Request("UNSUBSCRIBE", channel)

expectMsg(Some(List(Some(
ByteString("unsubscribe")),
Some(ByteString(channel)),
Some(0))))
expectMsg(Some(List(Some(
ByteString("unsubscribe")),
Some(ByteString(channel)),
Some(0))))

publisher ! Request("PUBLISH", channel, "test")
expectMsg(Some(0))
publisher ! Request("PUBLISH", channel, "test")
expectMsg(Some(0))

expectNoMsg
expectNoMsg
}
}
}

describe("should be able to block on blpop"){

describe("should be able to block on blpop") {
val brando = system.actorOf(Brando())
try{
val channel = UUID.randomUUID().toString
val popBrando = system.actorOf(Brando())
popBrando ! Request("BLPOP", "blpop:list", "0")

expectNoMsg
brando ! Request("LPUSH","blpop:list", "blpop-value")
expectMsgType[Option[Long]]
expectMsg(Some(List(Some(
ByteString("blpop:list")),
Some(ByteString("blpop-value")))))
} finally{
implicit val timeout = Timeout(1.seconds)
Await.ready((brando ? Request("del", "blpop:list")), 1.seconds)
try {
val channel = UUID.randomUUID().toString
val popBrando = system.actorOf(Brando())
popBrando ! Request("BLPOP", "blpop:list", "0")

expectNoMsg

brando ! Request("LPUSH", "blpop:list", "blpop-value")
expectMsgType[Option[Long]]

expectMsg(Some(List(Some(
ByteString("blpop:list")),
Some(ByteString("blpop-value")))))

} finally {
implicit val timeout = Timeout(1.seconds)
Await.ready((brando ? Request("del", "blpop:list")), 1.seconds)
}
}
}
Expand Down Expand Up @@ -405,6 +405,18 @@ class BrandoTest extends TestKit(ActorSystem("BrandoTest")) with FunSpecLike

probe.expectMsg(AuthenticationFailed)
}

it("should send a notification to later added listener") {
val probe = TestProbe()
val probe2 = TestProbe()
val brando = system.actorOf(Brando("localhost", 13579, listeners = Set(probe2.ref)))
brando ! probe.ref

//3 retries * 2 seconds = 6 seconds
probe.expectNoMsg(5900.milliseconds)
probe2.expectMsg(ConnectionFailed)
probe.expectMsg(ConnectionFailed)
}
}

describe("Connection") {
Expand Down

0 comments on commit 1c3dff0

Please sign in to comment.