From 77a6a16c266b39f3c634fdbd829a3011a34d635b Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 12 Oct 2014 18:45:09 -0400 Subject: [PATCH 1/4] implementation of store that uses http protocol --- project/Build.scala | 7 ++ .../twitter/storehaus/http/HttpStore.scala | 87 ++++++++++++++ .../http/HttpStringStoreProperties.scala | 109 ++++++++++++++++++ 3 files changed, 203 insertions(+) create mode 100644 storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala create mode 100644 storehaus-http/src/test/scala/com/twitter/storehaus/http/HttpStringStoreProperties.scala diff --git a/project/Build.scala b/project/Build.scala index 3f6d2712..8f3bd932 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -292,4 +292,11 @@ object StorehausBuild extends Build { javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) } ).dependsOn(storehausCore, storehausAlgebra, storehausCache) + lazy val storehausHttp = module("http").settings( + libraryDependencies ++= Seq( + Finagle.module("http") + ) + ).dependsOn(storehausCore) + + } diff --git a/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala new file mode 100644 index 00000000..0c9e31c8 --- /dev/null +++ b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2014 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.http + +import java.nio.charset.Charset +import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } +import org.jboss.netty.handler.codec.http.{ HttpRequest, HttpResponse, DefaultHttpRequest, HttpVersion, HttpMethod, HttpHeaders, HttpResponseStatus } +import com.twitter.util.Future +import com.twitter.finagle.{ Service, Http } +import com.twitter.storehaus.Store + +object HttpException { + def apply(response: HttpResponse): HttpException = + new HttpException(response.getStatus.getCode, response.getStatus.getReasonPhrase, response.getContent.toString(Charset.forName("UTF-8"))) +} + +case class HttpException(code: Int, reasonPhrase: String, content: String) extends Exception(s"""${reasonPhrase}\n${content}""") + +object HttpStore { + def apply(dest: String): HttpStore = new HttpStore(Http.newService(dest)) +} + +class HttpStore(val client: Service[HttpRequest, HttpResponse]) extends Store[String, ChannelBuffer] { + override def get(k: String): Future[Option[ChannelBuffer]] = { + val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, k) + request.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + client(request).map{ response => + response.getStatus match { + case HttpResponseStatus.OK => Some(response.getContent) + case HttpResponseStatus.NOT_FOUND => None + case _ => throw HttpException(response) + } + } + } + + override def put(kv: (String, Option[ChannelBuffer])): Future[Unit] = { + val request = kv match { + case (k, Some(cb)) => + val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, k) + req.setContent(cb) + req.headers.set(HttpHeaders.Names.CONTENT_LENGTH, cb.readableBytes.toString) + req + case (k, None) => + val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, k) + req.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + req + } + client(request).map{ response => + response.getStatus match { + case HttpResponseStatus.OK => () + case HttpResponseStatus.CREATED => () + case HttpResponseStatus.NO_CONTENT => () + case _ => throw HttpException(response) + } + } + } +} + +object HttpStringStore { + def apply(dest: String): HttpStringStore = new HttpStringStore(Http.newService(dest)) +} + +class HttpStringStore(val client: Service[HttpRequest, HttpResponse]) extends Store[String, String] { + private val store = new HttpStore(client) + private val utf8 = Charset.forName("UTF-8") + + override def get(k: String): Future[Option[String]] = store.get(k).map(_.map(_.toString(utf8))) + + override def put(kv: (String, Option[String])): Future[Unit] = store.put(( + kv._1, + kv._2.map(s => ChannelBuffers.wrappedBuffer(s.getBytes(utf8))) + )) +} diff --git a/storehaus-http/src/test/scala/com/twitter/storehaus/http/HttpStringStoreProperties.scala b/storehaus-http/src/test/scala/com/twitter/storehaus/http/HttpStringStoreProperties.scala new file mode 100644 index 00000000..853897a8 --- /dev/null +++ b/storehaus-http/src/test/scala/com/twitter/storehaus/http/HttpStringStoreProperties.scala @@ -0,0 +1,109 @@ +/* + * Copyright 2014 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.http + +import java.util.concurrent.ConcurrentHashMap +import java.nio.charset.Charset +import org.jboss.netty.buffer.ChannelBuffers +import org.jboss.netty.handler.codec.http.{ HttpRequest, HttpResponse, DefaultHttpResponse, HttpResponseStatus, HttpMethod, HttpHeaders } +import com.twitter.util.{ Await, Future } +import com.twitter.finagle.{ Service, Http, ListeningServer } +import com.twitter.storehaus.{ FutureOps, Store } +import com.twitter.storehaus.testing.CloseableCleanup +import com.twitter.storehaus.testing.generator.NonEmpty +import org.scalacheck.{ Arbitrary, Gen, Properties } +import org.scalacheck.Prop._ + +object HttpStringStoreProperties extends Properties("HttpStringStore") with CloseableCleanup[ListeningServer] { + def validPairs: Gen[List[(String, Option[String])]] = + NonEmpty.Pairing.alphaStrs().map(_.map{ case (k, v) => ("/" + k, v) }) + + def baseTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + forAll(validPairs) { (examples: List[(K, Option[V])]) => + put(store, examples) + examples.toMap.forall { case (k, optV) => + val res = Await.result(store.get(k)) + Equiv[Option[V]].equiv(res, optV) + } + } + + def putStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { (s, pairs) => + pairs.foreach { + case (k, v) => + Await.result(s.put((k, v))) + } + } + + def multiPutStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { (s, pairs) => + Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) + } + + def storeTest(store: Store[String, String]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + val service = new Service[HttpRequest, HttpResponse] { + private val map = new ConcurrentHashMap[String, String]() + private val utf8 = Charset.forName("UTF-8") + + def apply(request: HttpRequest): Future[HttpResponse] = { + val response = request.getMethod match { + case HttpMethod.GET => + Option(map.get(request.getUri)).map{ v => + val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK) + val content = ChannelBuffers.wrappedBuffer(v.getBytes(utf8)) + resp.setContent(content) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes.toString) + resp + }.getOrElse { + val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.NOT_FOUND) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + resp + } + case HttpMethod.DELETE => + map.remove(request.getUri) + val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.NO_CONTENT) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + resp + case HttpMethod.PUT => + val maybeOldV = Option(map.put(request.getUri, request.getContent.toString(utf8))) + val resp = new DefaultHttpResponse(request.getProtocolVersion, maybeOldV.map(_ => HttpResponseStatus.OK).getOrElse(HttpResponseStatus.CREATED)) + resp.setContent(request.getContent) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, request.getContent.readableBytes.toString) + resp + case _ => + new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.METHOD_NOT_ALLOWED) + } + Future.value(response) + } + } + + val server = Http.serve("localhost:0", service) + + val store = HttpStringStore(server.boundAddress.toString.substring(1)) // i dont know how else to convert boundAddress into something usable + + property("HttpStringStore test") = storeTest(store) + + override def closeable = server + + override def cleanup() = { + println("closing server") + super.cleanup() + } +} From f9af91f61d8ff400d4f902bc319398c290889b17 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 16 Oct 2014 23:07:27 -0400 Subject: [PATCH 2/4] woops forgot about scala 2.9 --- .../src/main/scala/com/twitter/storehaus/http/HttpStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala index 0c9e31c8..95d3c95b 100644 --- a/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala +++ b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala @@ -28,7 +28,7 @@ object HttpException { new HttpException(response.getStatus.getCode, response.getStatus.getReasonPhrase, response.getContent.toString(Charset.forName("UTF-8"))) } -case class HttpException(code: Int, reasonPhrase: String, content: String) extends Exception(s"""${reasonPhrase}\n${content}""") +case class HttpException(code: Int, reasonPhrase: String, content: String) extends Exception(reasonPhrase + Option(content).map("\n" + _ ).getOrElse("")) object HttpStore { def apply(dest: String): HttpStore = new HttpStore(Http.newService(dest)) From 744625b73b19b3fe63fb14a2708d72b02d22fa4d Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Fri, 17 Oct 2014 18:16:47 -0400 Subject: [PATCH 3/4] use ConvertedStore and Injection for HttpStringStore --- project/Build.scala | 3 ++- .../com/twitter/storehaus/http/HttpStore.scala | 18 ++++++------------ 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 8f3bd932..30a555b3 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -294,7 +294,8 @@ object StorehausBuild extends Build { lazy val storehausHttp = module("http").settings( libraryDependencies ++= Seq( - Finagle.module("http") + Finagle.module("http"), + "com.twitter" %% "bijection-netty" % bijectionVersion ) ).dependsOn(storehausCore) diff --git a/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala index 95d3c95b..39dee3c8 100644 --- a/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala +++ b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala @@ -20,8 +20,10 @@ import java.nio.charset.Charset import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } import org.jboss.netty.handler.codec.http.{ HttpRequest, HttpResponse, DefaultHttpRequest, HttpVersion, HttpMethod, HttpHeaders, HttpResponseStatus } import com.twitter.util.Future +import com.twitter.bijection.StringCodec +import com.twitter.bijection.netty.ChannelBufferBijection import com.twitter.finagle.{ Service, Http } -import com.twitter.storehaus.Store +import com.twitter.storehaus.{ Store, ConvertedStore } object HttpException { def apply(response: HttpResponse): HttpException = @@ -74,14 +76,6 @@ object HttpStringStore { def apply(dest: String): HttpStringStore = new HttpStringStore(Http.newService(dest)) } -class HttpStringStore(val client: Service[HttpRequest, HttpResponse]) extends Store[String, String] { - private val store = new HttpStore(client) - private val utf8 = Charset.forName("UTF-8") - - override def get(k: String): Future[Option[String]] = store.get(k).map(_.map(_.toString(utf8))) - - override def put(kv: (String, Option[String])): Future[Unit] = store.put(( - kv._1, - kv._2.map(s => ChannelBuffers.wrappedBuffer(s.getBytes(utf8))) - )) -} +class HttpStringStore(val client: Service[HttpRequest, HttpResponse]) + extends ConvertedStore[String, String, ChannelBuffer, String](new HttpStore(client))(identity)( + StringCodec.utf8 andThen ChannelBufferBijection.inverse) From 81549bcbe6dafb3e87587b609842db6405bbcccd Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Mon, 20 Oct 2014 18:39:46 -0400 Subject: [PATCH 4/4] add storehausHttp to root project --- project/Build.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/Build.scala b/project/Build.scala index 30a555b3..4bb2e30d 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -145,6 +145,7 @@ object StorehausBuild extends Build { storehausKafka08, storehausMongoDB, storehausElastic, + storehausHttp, storehausTesting )