Skip to content

Commit

Permalink
Merge pull request #19 from twitter/develop
Browse files Browse the repository at this point in the history
Merge from twitter/storehaus
  • Loading branch information
rubanm committed Dec 8, 2014
2 parents 51623db + 7cb17e5 commit 9ac82a0
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 19 deletions.
9 changes: 9 additions & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ object StorehausBuild extends Build {
storehausKafka08,
storehausMongoDB,
storehausElastic,
storehausHttp,
storehausTesting
)

Expand Down Expand Up @@ -292,4 +293,12 @@ object StorehausBuild extends Build {
javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) }
).dependsOn(storehausCore, storehausAlgebra, storehausCache)

lazy val storehausHttp = module("http").settings(
libraryDependencies ++= Seq(
Finagle.module("http"),
"com.twitter" %% "bijection-netty" % bijectionVersion
)
).dependsOn(storehausCore)


}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.0
sbt.version=0.13.5
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
resolvers ++= Seq(
"jgit-repo" at "http://download.eclipse.org/jgit/maven",
"sonatype-releases" at "http://oss.sonatype.org/content/repositories/releases"
"sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases"
)

addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1")
Expand All @@ -9,4 +9,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")

addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.5.1")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
2 changes: 1 addition & 1 deletion sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Author: Paul Phillips <[email protected]>

# todo - make this dynamic
declare -r sbt_release_version=0.13.0
declare -r sbt_release_version=0.13.5

declare sbt_jar sbt_dir sbt_create sbt_launch_dir
declare scala_version java_home sbt_explicit_version
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2014 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus

import com.twitter.conversions.time._
import com.twitter.util.{JavaTimer, Timer}

import org.scalacheck.Properties

object RetryingStoreProperties extends Properties("RetryingStore") {
import StoreProperties.storeTest

implicit val timer: Timer = new JavaTimer(true)

property("RetryingStore obeys the Store laws, assuming the underlying Store always returns results before timeout") =
storeTest[String, Int] {
Store.withRetry[String, Int](
store = new ConcurrentHashMapStore[String,Int](),
backoffs = for (i <- 0 until 3) yield 1.milliseconds
)(_ => true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ import com.twitter.storehaus.ConvertedStore
import com.twitter.storehaus.algebra.MergeableStore

import scala.util.Try

import com.amazonaws.regions.{ Region, Regions }
import com.amazonaws.services.dynamodbv2.model._

import AwsBijections._

object DynamoLongStore {
def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, primaryKeyColumn: String, valueColumn: String) =
new DynamoLongStore(DynamoStore(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn))
def apply(awsAccessKey: String, awsSecretKey: String, tableName: String,
primaryKeyColumn: String, valueColumn: String,
endpoint: Regions = Regions.US_EAST_1) =

new DynamoLongStore(DynamoStore(awsAccessKey, awsSecretKey, tableName,
primaryKeyColumn, valueColumn, endpoint))
}

class DynamoLongStore(underlying: DynamoStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.twitter.util.{ Future, FuturePool }
import com.twitter.storehaus.Store

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.{ Region, Regions }
import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDBClient, AmazonDynamoDB }
import com.amazonaws.services.dynamodbv2.model._

Expand All @@ -34,19 +35,25 @@ import AwsBijections._

object DynamoStore {

def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, primaryKeyColumn: String, valueColumn: String): DynamoStore = {
def apply(awsAccessKey: String, awsSecretKey: String, tableName: String,
primaryKeyColumn: String, valueColumn: String,
endpoint: Regions = Regions.US_EAST_1): DynamoStore = {

val processors = Runtime.getRuntime.availableProcessors
this(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn, processors)
this(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn,
processors, endpoint)
}

def apply(awsAccessKey: String, awsSecretKey: String, tableName: String,
primaryKeyColumn: String, valueColumn: String, numberWorkerThreads: Int): DynamoStore = {
primaryKeyColumn: String, valueColumn: String, numberWorkerThreads: Int,
endpoint: Regions): DynamoStore = {

val auth = new BasicAWSCredentials(awsAccessKey, awsSecretKey)
val client = new AmazonDynamoDBClient(auth)
new DynamoStore(client, tableName, primaryKeyColumn, valueColumn, numberWorkerThreads)
var client = new AmazonDynamoDBClient(auth)
client.setRegion(Region.getRegion(endpoint));
new DynamoStore(client, tableName, primaryKeyColumn, valueColumn,
numberWorkerThreads)
}

}

class DynamoStore(val client: AmazonDynamoDB, val tableName: String,
Expand Down Expand Up @@ -75,9 +82,7 @@ class DynamoStore(val client: AmazonDynamoDB, val tableName: String,

apiRequestFuturePool(client.deleteItem(deleteRequest))
}

}

}

override def get(k: String): Future[Option[AttributeValue]] = {
Expand All @@ -88,6 +93,4 @@ class DynamoStore(val client: AmazonDynamoDB, val tableName: String,
Option(client.getItem(getRequest).getItem).map(_.get(valueColumn))
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ import java.util.{ Map => JMap }

import com.twitter.storehaus.ConvertedStore

import com.amazonaws.regions.{ Region, Regions }
import com.amazonaws.services.dynamodbv2.model._

import AwsBijections._

object DynamoStringStore {
def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, primaryKeyColumn: String, valueColumn: String) =
new DynamoStringStore(DynamoStore(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn))
def apply(awsAccessKey: String, awsSecretKey: String, tableName: String,
primaryKeyColumn: String, valueColumn: String,
endpoint: Regions = Regions.US_EAST_1) =

new DynamoStringStore(DynamoStore(awsAccessKey, awsSecretKey, tableName,
primaryKeyColumn, valueColumn, endpoint))
}

class DynamoStringStore(underlying: DynamoStore)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2014 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.http

import java.nio.charset.Charset
import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }
import org.jboss.netty.handler.codec.http.{ HttpRequest, HttpResponse, DefaultHttpRequest, HttpVersion, HttpMethod, HttpHeaders, HttpResponseStatus }
import com.twitter.util.Future
import com.twitter.bijection.StringCodec
import com.twitter.bijection.netty.ChannelBufferBijection
import com.twitter.finagle.{ Service, Http }
import com.twitter.storehaus.{ Store, ConvertedStore }

object HttpException {
def apply(response: HttpResponse): HttpException =
new HttpException(response.getStatus.getCode, response.getStatus.getReasonPhrase, response.getContent.toString(Charset.forName("UTF-8")))
}

case class HttpException(code: Int, reasonPhrase: String, content: String) extends Exception(reasonPhrase + Option(content).map("\n" + _ ).getOrElse(""))

object HttpStore {
def apply(dest: String): HttpStore = new HttpStore(Http.newService(dest))
}

class HttpStore(val client: Service[HttpRequest, HttpResponse]) extends Store[String, ChannelBuffer] {
override def get(k: String): Future[Option[ChannelBuffer]] = {
val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, k)
request.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0")
client(request).map{ response =>
response.getStatus match {
case HttpResponseStatus.OK => Some(response.getContent)
case HttpResponseStatus.NOT_FOUND => None
case _ => throw HttpException(response)
}
}
}

override def put(kv: (String, Option[ChannelBuffer])): Future[Unit] = {
val request = kv match {
case (k, Some(cb)) =>
val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, k)
req.setContent(cb)
req.headers.set(HttpHeaders.Names.CONTENT_LENGTH, cb.readableBytes.toString)
req
case (k, None) =>
val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, k)
req.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0")
req
}
client(request).map{ response =>
response.getStatus match {
case HttpResponseStatus.OK => ()
case HttpResponseStatus.CREATED => ()
case HttpResponseStatus.NO_CONTENT => ()
case _ => throw HttpException(response)
}
}
}
}

object HttpStringStore {
def apply(dest: String): HttpStringStore = new HttpStringStore(Http.newService(dest))
}

class HttpStringStore(val client: Service[HttpRequest, HttpResponse])
extends ConvertedStore[String, String, ChannelBuffer, String](new HttpStore(client))(identity)(
StringCodec.utf8 andThen ChannelBufferBijection.inverse)
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2014 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.http

import java.util.concurrent.ConcurrentHashMap
import java.nio.charset.Charset
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.handler.codec.http.{ HttpRequest, HttpResponse, DefaultHttpResponse, HttpResponseStatus, HttpMethod, HttpHeaders }
import com.twitter.util.{ Await, Future }
import com.twitter.finagle.{ Service, Http, ListeningServer }
import com.twitter.storehaus.{ FutureOps, Store }
import com.twitter.storehaus.testing.CloseableCleanup
import com.twitter.storehaus.testing.generator.NonEmpty
import org.scalacheck.{ Arbitrary, Gen, Properties }
import org.scalacheck.Prop._

object HttpStringStoreProperties extends Properties("HttpStringStore") with CloseableCleanup[ListeningServer] {
def validPairs: Gen[List[(String, Option[String])]] =
NonEmpty.Pairing.alphaStrs().map(_.map{ case (k, v) => ("/" + k, v) })

def baseTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]])
(put: (Store[K, V], List[(K, Option[V])]) => Unit) =
forAll(validPairs) { (examples: List[(K, Option[V])]) =>
put(store, examples)
examples.toMap.forall { case (k, optV) =>
val res = Await.result(store.get(k))
Equiv[Option[V]].equiv(res, optV)
}
}

def putStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) =
baseTest(store, validPairs) { (s, pairs) =>
pairs.foreach {
case (k, v) =>
Await.result(s.put((k, v)))
}
}

def multiPutStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) =
baseTest(store, validPairs) { (s, pairs) =>
Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap)))
}

def storeTest(store: Store[String, String]) =
putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs)

val service = new Service[HttpRequest, HttpResponse] {
private val map = new ConcurrentHashMap[String, String]()
private val utf8 = Charset.forName("UTF-8")

def apply(request: HttpRequest): Future[HttpResponse] = {
val response = request.getMethod match {
case HttpMethod.GET =>
Option(map.get(request.getUri)).map{ v =>
val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK)
val content = ChannelBuffers.wrappedBuffer(v.getBytes(utf8))
resp.setContent(content)
resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes.toString)
resp
}.getOrElse {
val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.NOT_FOUND)
resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0")
resp
}
case HttpMethod.DELETE =>
map.remove(request.getUri)
val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.NO_CONTENT)
resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0")
resp
case HttpMethod.PUT =>
val maybeOldV = Option(map.put(request.getUri, request.getContent.toString(utf8)))
val resp = new DefaultHttpResponse(request.getProtocolVersion, maybeOldV.map(_ => HttpResponseStatus.OK).getOrElse(HttpResponseStatus.CREATED))
resp.setContent(request.getContent)
resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, request.getContent.readableBytes.toString)
resp
case _ =>
new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.METHOD_NOT_ALLOWED)
}
Future.value(response)
}
}

val server = Http.serve("localhost:0", service)

val store = HttpStringStore(server.boundAddress.toString.substring(1)) // i dont know how else to convert boundAddress into something usable

property("HttpStringStore test") = storeTest(store)

override def closeable = server

override def cleanup() = {
println("closing server")
super.cleanup()
}
}

0 comments on commit 9ac82a0

Please sign in to comment.