Skip to content

Commit

Permalink
Handle topic not found. Scala 2.12 and sbt 1 (#36)
Browse files Browse the repository at this point in the history
* Handle topics not found
* Publish request to non-existing topic should fail
* update .travis.yml
  • Loading branch information
Sergey Novikov authored Nov 14, 2017
1 parent 623f2f4 commit 5226723
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 96 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sudo: required
language: scala

scala:
- 2.11.8
- 2.12.4

jdk:
- oraclejdk8
Expand Down Expand Up @@ -44,7 +44,7 @@ install:
script:
- sbt coverage test coverageReport
- sbt assembly
- java -jar $(ls $TRAVIS_BUILD_DIR/target/scala-2.11/sns-*.jar | tail -1) > log.txt &
- java -jar $(ls $TRAVIS_BUILD_DIR/target/scala-2.12/sns-*.jar | tail -1) > log.txt &
- >
bash <(curl -s https://raw.githubusercontent.com/s12v/wait4port/master/wait4port.sh)
http://localhost:9911
Expand Down Expand Up @@ -75,7 +75,7 @@ deploy:
provider: releases
api-key:
secure: "iMohs9rpYzhnYdUdCvh+t9crjoF2lcWuRlSCg+3kadXPt0V8lHoPMZio0wNROMtLTkLUqHcZgwHnvZ7T3PwfKrNAoZ7HnQbpXng3CApZgL68zQDwcykb0yOvZDopcjzZ/xPw29wnP8Sl5D6yelGakyx10AyjepHoesDc/hwnj8e0WYB2OflY6TiZja9n/tTXLr6wBhHnbe3b3t14WlRCM2w6CFfOGCo9bVGJG0V0kECZDz7hDYLbsmjhvpbEawUuNJAOM5AzrUetnbaA/TRjieDZF7/jhQiWsd4zlU4L0/JjGc+pNaz0aXk6Soyb4SfopQfIPoKMYYIwqiOkJxRmXCvLVTX6UMi6uUYR1CxpWPHnpVTt5eoEBAQMPHSfZfqz1AJ7PshSqn4iOgKDdw/MBOJFs/0qslE7e98bGQaAm2DwVP+LXw1EpPlC/q7C9TbQV0TP6FoGOyhQUYrF7ySgOfPnAdDoH7oPhrKyLWMrM5i3X0L5jQ7XIa2EgPxlWbsaT9v6SE4/Qo+D9rrfadmCNTWWa1med1OrcS4WWujz2YOKXmdRX5v1k6Wf2b9lok9qw9FpMHqAaC+tyo5kDoIjOzQS587uydm89u6g+TGEAGNyNHmkQj5fE/WaMcsOtq+VYGCFdoT/pvFJg9a4S1qzC6U67yoPjYgXT9++W9JHOro="
file: "$(ls $TRAVIS_BUILD_DIR/target/scala-2.11/sns-*.jar | tail -1)"
file: "$(ls $TRAVIS_BUILD_DIR/target/scala-2.12/sns-*.jar | tail -1)"
skip_cleanup: true
on:
tags: true
Expand Down
82 changes: 17 additions & 65 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,90 +1,42 @@
name := "sns"

version := "0.1.3"
version := "0.2.0"

scalaVersion := "2.11.8"
scalaVersion := "2.12.4"

// sbt-assembly
assemblyJarName in assembly := s"sns-${version.value}.jar"
test in assembly := {}

libraryDependencies ++= {
val akkaVersion = "2.4.9"
val camelVersion = "2.17.0"
val akkaVersion = "2.5.6"
val akkaHttpVersion = "10.0.10"
val camelVersion = "2.19.4"

libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
"com.typesafe.akka" %% "akka-http-xml-experimental" % akkaVersion,
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion
,
"com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % Test,

"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test,

"org.slf4j" % "slf4j-api" % "1.7.2",
"ch.qos.logback" % "logback-classic" % "1.0.7",
"com.typesafe.akka" %% "akka-camel" % akkaVersion,
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.228",
"org.apache.camel" % "camel-aws" % camelVersion
exclude("com.amazonaws", "aws-java-sdk-acm")
exclude("com.amazonaws", "aws-java-sdk-api-gateway")
exclude("com.amazonaws", "aws-java-sdk-autoscaling")
exclude("com.amazonaws", "aws-java-sdk-cloudformation")
exclude("com.amazonaws", "aws-java-sdk-cloudfront")
exclude("com.amazonaws", "aws-java-sdk-cloudwatch")
exclude("com.amazonaws", "aws-java-sdk-cloudwatchmetrics")
exclude("com.amazonaws", "aws-java-sdk-codedeploy")
exclude("com.amazonaws", "aws-java-sdk-codecommit")
exclude("com.amazonaws", "aws-java-sdk-codepipeline")
exclude("com.amazonaws", "aws-java-sdk-cognitoidentity")
exclude("com.amazonaws", "aws-java-sdk-cognitosync")
exclude("com.amazonaws", "aws-java-sdk-datapipeline")
exclude("com.amazonaws", "aws-java-sdk-directconnect")
exclude("com.amazonaws", "aws-java-sdk-kinesis")
exclude("com.amazonaws", "aws-java-sdk-opsworks")
exclude("com.amazonaws", "aws-java-sdk-ses")
exclude("com.amazonaws", "aws-java-sdk-cloudsearch")
exclude("com.amazonaws", "aws-java-sdk-swf-libraries")
exclude("com.amazonaws", "aws-java-sdk-lambda")
exclude("com.amazonaws", "aws-java-sdk-ecs")
exclude("com.amazonaws", "aws-java-sdk-workspaces")
exclude("com.amazonaws", "aws-java-sdk-machinelearning")
exclude("com.amazonaws", "aws-java-sdk-directory")
exclude("com.amazonaws", "aws-java-sdk-efs")
exclude("com.amazonaws", "aws-java-sdk-waf")
exclude("com.amazonaws", "aws-java-sdk-marketplacecommerceanalytics")
exclude("com.amazonaws", "aws-java-sdk-inspector")
exclude("com.amazonaws", "aws-java-sdk-iot")
exclude("com.amazonaws", "aws-java-sdk-gamelift")
exclude("com.amazonaws", "aws-java-sdk-simpledb")
exclude("com.amazonaws", "aws-java-sdk-simpleworkflow")
exclude("com.amazonaws", "aws-java-sdk-storagegateway")
exclude("com.amazonaws", "aws-java-sdk-s3")
exclude("com.amazonaws", "aws-java-sdk-route53")
exclude("com.amazonaws", "aws-java-sdk-kms")
exclude("com.amazonaws", "aws-java-sdk-sts")
exclude("com.amazonaws", "aws-java-sdk-rds")
exclude("com.amazonaws", "aws-java-sdk-redshift")
exclude("com.amazonaws", "aws-java-sdk-glacier")
exclude("com.amazonaws", "aws-java-sdk-elasticloadbalancing")
exclude("com.amazonaws", "aws-java-sdk-emr")
exclude("com.amazonaws", "aws-java-sdk-ec2")
exclude("com.amazonaws", "aws-java-sdk-elasticache")
exclude("com.amazonaws", "aws-java-sdk-dynamodb")
exclude("com.amazonaws", "aws-java-sdk-iam")
exclude("com.amazonaws", "aws-java-sdk-cloudtrail")
exclude("com.amazonaws", "aws-java-sdk-elastictranscoder")
exclude("com.amazonaws", "aws-java-sdk-elasticsearch")
exclude("com.amazonaws", "aws-java-sdk-ssm")
exclude("com.amazonaws", "aws-java-sdk-cloudhsm")
exclude("com.amazonaws", "aws-java-sdk-devicefarm")
exclude("com.amazonaws", "aws-java-sdk-elasticbeanstalk")
exclude("com.amazonaws", "aws-java-sdk-ecr")
excludeAll ExclusionRule(organization = "com.amazonaws")
,
"org.apache.camel" % "camel-http" % camelVersion,
"org.apache.camel" % "camel-rabbitmq" % camelVersion,
"org.apache.camel" % "camel-slack" % camelVersion
exclude("junit", "junit")
,
"org.scalatest" %% "scalatest" % "3.0.0" % Test
"org.scalatest" %% "scalatest" % "3.0.4" % Test
)
}

dependencyOverrides += "com.typesafe.akka" %% "akka-actor" % akkaVersion
dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaVersion
4 changes: 4 additions & 0 deletions features/publish.feature
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ Feature: Publish
Then I wait for 1 seconds
Then I should see "Hello, World" in file "./tmp/sns1.log"
Then I should not see "Hello, World" in file "./tmp/sns2.log"

Scenario: Publish in non-existing topic
When I publish a message "Hello, World!" to TopicArn "non-existing-1"
Then The publish request should return "NotFound" error
11 changes: 11 additions & 0 deletions features/step_definitions/publish.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,21 @@
@response = $SNS.publish(topic_arn: get_arn(topic), message: message)
end

When(/^I publish a message "([^"]*)" to TopicArn "([^"]*)"$/) do |message, topic_arn|
begin
@response = $SNS.publish(topic_arn: topic_arn, message: message)
rescue => @error
end
end

Then(/^The publish request should be successful$/) do
expect(@response.message_id.length).to be > 0
end

Then(/^The publish request should return "([^"]*)" error$/) do |code|
expect(@error.code).to eq code
end

def file_contains_string(file, message)
File.read(file).index(message)
end
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.9
sbt.version=1.0.3
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.0")
26 changes: 18 additions & 8 deletions src/main/scala/me/snov/sns/actor/PublishActor.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package me.snov.sns.actor

import akka.actor.Status.{Failure, Success}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.pattern.ask
import akka.pattern.pipe
import akka.util.Timeout
import me.snov.sns.actor.SubscribeActor.CmdFanOut
import me.snov.sns.model.Message

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object PublishActor {
def props(actor: ActorRef) = Props(classOf[PublishActor], actor)

Expand All @@ -12,17 +19,20 @@ object PublishActor {

class PublishActor(subscribeActor: ActorRef) extends Actor with ActorLogging {
import me.snov.sns.actor.PublishActor._

private def publish(topicArn: String, bodies: Map[String, String]): Message = {

private implicit val timeout = Timeout(1.second)
private implicit val ec = context.dispatcher

private def publish(topicArn: String, bodies: Map[String, String])(implicit ec: ExecutionContext) = {
val message = Message(bodies)

// todo ask
subscribeActor ! CmdFanOut(topicArn, message)

message
(subscribeActor ? CmdFanOut(topicArn, message)).map {
case Failure(e) => Failure(e)
case Success => message
}
}

override def receive = {
case CmdPublish(topicArn, bodies) => sender ! publish(topicArn, bodies)
case CmdPublish(topicArn, bodies) => publish(topicArn, bodies) pipeTo sender
}
}
10 changes: 5 additions & 5 deletions src/main/scala/me/snov/sns/actor/SubscribeActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.UUID
import akka.actor.Status.{Failure, Success}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import me.snov.sns.actor.DbActor.CmdGetConfiguration
import me.snov.sns.model.{Configuration, Message, Subscription, Topic}
import me.snov.sns.model._

object SubscribeActor {
def props(dbActor: ActorRef) = Props(classOf[SubscribeActor], dbActor)
Expand Down Expand Up @@ -39,10 +39,10 @@ class SubscribeActor(dbActor: ActorRef) extends Actor with ActorLogging {

dbActor ! CmdGetConfiguration

def fanOut(topicArn: TopicArn, message: Message) = {
private def fanOut(topicArn: TopicArn, message: Message) = {
try {
if (topics.isDefinedAt(topicArn) && subscriptions.isDefinedAt(topicArn)) {
subscriptions.get(topicArn).get.foreach((s: Subscription) => {
subscriptions(topicArn).foreach((s: Subscription) => {
if (actorPool.isDefinedAt(s)) {
log.debug(s"Sending message ${message.uuid} to ${s.endpoint}")
actorPool(s) ! message
Expand All @@ -51,12 +51,12 @@ class SubscribeActor(dbActor: ActorRef) extends Actor with ActorLogging {
}
})
} else {
throw new RuntimeException(s"Topic not found: $topicArn")
throw new TopicNotFoundException(s"Topic not found: $topicArn")
}

Success
} catch {
case e: RuntimeException => Failure
case e: Throwable => Failure(e)
}
}

Expand Down
19 changes: 11 additions & 8 deletions src/main/scala/me/snov/sns/api/PublishApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.util.Timeout
import me.snov.sns.actor.PublishActor.CmdPublish
import me.snov.sns.model.Message
import me.snov.sns.model.{Message, TopicNotFoundException}
import me.snov.sns.response.PublishResponse

import spray.json.DefaultJsonProtocol._
import spray.json._
import DefaultJsonProtocol._

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}


object PublishApi {
private val arnPattern = """([\w+_:-]{1,512})""".r

def route(actorRef: ActorRef)(implicit timeout: Timeout, ec: ExecutionContext): Route = {
pathSingleSlash {
formField('Action ! "Publish") {
Expand All @@ -31,8 +31,11 @@ object PublishApi {
case None => Map("default" -> message)
}

(actorRef ? CmdPublish(topic, bodies)).mapTo[Message].map {
PublishResponse.publish
(actorRef ? CmdPublish(topic, bodies)).collect {
case m: Message => PublishResponse.publish(m)
}.recover {
case t: TopicNotFoundException => PublishResponse.topicNotFound(t.getMessage)
case t: Throwable => HttpResponse(500, entity = t.getMessage)
}
}
case _ => complete(HttpResponse(400, entity = "Invalid topic ARN"))
Expand All @@ -41,7 +44,7 @@ object PublishApi {
case e: RuntimeException => complete(HttpResponse(400, entity = e.getMessage))
}
} ~
complete(HttpResponse(400, entity = "TopicArn is required"))
complete(HttpResponse(400, entity = "TopicArn is required"))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/me/snov/sns/model/TopicNotFoundException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package me.snov.sns.model

class TopicNotFoundException(msg: String) extends RuntimeException(msg)

18 changes: 17 additions & 1 deletion src/main/scala/me/snov/sns/response/PublishResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package me.snov.sns.response

import java.util.UUID

import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.StatusCodes._
import me.snov.sns.model.Message

object PublishResponse extends XmlHttpResponse {
def publish(message: Message) = {
def publish(message: Message): HttpResponse = {
response(
OK,
<PublishResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>{message.uuid}</MessageId>
Expand All @@ -17,4 +20,17 @@ object PublishResponse extends XmlHttpResponse {
</PublishResponse>
)
}

def topicNotFound(message: String): HttpResponse = {
response(
NotFound,
<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
<Error>
<Code>NotFound</Code>
<Message>{message}</Message>
</Error>
<RequestId>{UUID.randomUUID}</RequestId>
</ErrorResponse>
)
}
}
5 changes: 5 additions & 0 deletions src/main/scala/me/snov/sns/response/SubscribeResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package me.snov.sns.response

import java.util.UUID

import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.model.HttpResponse
import me.snov.sns.model.Subscription

object SubscribeResponse extends XmlHttpResponse {
def subscribe(subscription: Subscription) = {
response(
OK,
<SubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
<SubscribeResult>
<SubscriptionArn>
Expand All @@ -25,6 +27,7 @@ object SubscribeResponse extends XmlHttpResponse {

def unsubscribe = {
response(
OK,
<UnsubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
<ResponseMetadata>
<RequestId>
Expand All @@ -37,6 +40,7 @@ object SubscribeResponse extends XmlHttpResponse {

def list(subscriptions: Iterable[Subscription]): HttpResponse = {
response(
OK,
<ListSubscriptionsResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
<ListSubscriptionsResult>
<Subscriptions>
Expand Down Expand Up @@ -72,6 +76,7 @@ object SubscribeResponse extends XmlHttpResponse {

def listByTopic(subscriptions: Iterable[Subscription]): HttpResponse = {
response(
OK,
<ListSubscriptionsByTopicResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
<ListSubscriptionsByTopicResult>
<Subscriptions>
Expand Down
Loading

0 comments on commit 5226723

Please sign in to comment.