diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala index 6d7f7dd8..874d2b6d 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala @@ -17,7 +17,7 @@ package com.twitter.storehaus import com.twitter.concurrent.AsyncMutex -import com.twitter.util.Future +import com.twitter.util.{ Future, Return } /** * Provides read-through caching on a readable store fronted by a cache. @@ -46,27 +46,22 @@ class ReadThroughStore[K, V](backingStore: ReadableStore[K, V], cache: Store[K, mutex.acquire.flatMap { p => cache.put((k, storeValue)) .map { u : Unit => storeValue } - .onFailure { case x: Exception => storeValue } + .rescue { case x: Exception => Future.value(storeValue) } .ensure { p.release } } } } - override def get(k: K): Future[Option[V]] = - cache.get(k).flatMap { cacheValue => - cacheValue match { - case None => getFromBackingStore(k) - case some => Future.value(some) - } - } onFailure { case x: Exception => - getFromBackingStore(k) - } + override def get(k: K): Future[Option[V]] = cache.get(k) transform { + case Return(v @ Some(_)) => Future.value(v) + case _ => getFromBackingStore(k) + } override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { // attempt to read from cache first val cacheResults : Map[K1, Future[Either[Option[V], Exception]]] = cache.multiGet(ks).map { case (k, f) => - (k, f.map { optv => Left(optv) } onFailure { case x: Exception => Right(x) }) + (k, f.map { optv => Left(optv) } rescue { case x: Exception => Future.value(Right(x)) }) } // attempt to read all failed keys and cache misses from backing store @@ -89,4 +84,3 @@ class ReadThroughStore[K, V](backingStore: ReadableStore[K, V], cache: Store[K, FutureOps.liftValues(ks, f, { (k: K1) => Future.None }) } } - diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala index 6c044ee7..5841adff 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala @@ -40,16 +40,16 @@ class WriteThroughStore[K, V](backingStore: Store[K, V], cache: Store[K, V], inv override def put(kv: (K, Option[V])): Future[Unit] = mutex.acquire.flatMap { p => // write key to backing store first - backingStore.put(kv).flatMap { u: Unit => + backingStore.put(kv).flatMap { _ => // now write key to cache, best effort - cache.put(kv) onFailure { case x: Exception => u } - } onFailure { case x: Exception => + cache.put(kv) rescue { case _ => Future.Unit } + } rescue { case x => // write to backing store failed // now optionally invalidate the key in cache, best effort if (invalidate) { - cache.put((kv._1, None)).flatMap { u: Unit => throw x } onFailure { throw x } + cache.put((kv._1, None)) transform { _ => Future.exception(x) } } else { - throw x + Future.exception(x) } } ensure { p.release @@ -61,7 +61,7 @@ class WriteThroughStore[K, V](backingStore: Store[K, V], cache: Store[K, V], inv // write keys to backing store first val storeResults : Map[K1, Future[Either[Unit, Exception]]] = backingStore.multiPut(kvs).map { case (k, f) => - (k, f.map { u: Unit => Left(u) }.onFailure { case x: Exception => Right(x) }) + (k, f.map { u: Unit => Left(u) }.rescue { case x: Exception => Future.value(Right(x)) }) } // perform cache operations based on how writes to backing store go diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/ExceptionStore.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/ExceptionStore.scala new file mode 100644 index 00000000..01874331 --- /dev/null +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/ExceptionStore.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2014 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus + +import com.twitter.util.Future + +import scala.util.Random + +class ExceptionStore[K, V](possibility: Float = 0.5f) extends ConcurrentHashMapStore[K, V] { + private[this] def wrap[A](f: => Future[A]): Future[A] = { + if (Random.nextFloat() < possibility) Future.exception(new RuntimeException()) + else f + } + + override def get(k: K): Future[Option[V]] = wrap(super.get(k)) + + override def put(kv: (K, Option[V])): Future[Unit] = wrap(super.put(kv)) +} diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala index b6c8ba9c..9326bec4 100644 --- a/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala @@ -26,5 +26,11 @@ object ReadThroughStoreProperties extends Properties("ReadThroughStoreProperties readableStoreLaws[String, Int] { m => new ReadThroughStore(ReadableStore.fromMap(m), new ConcurrentHashMapStore[String,Int]) } + + property("ReadThroughStore should ignore exceptions on the cache-store") = + readableStoreLaws[String, Int] { m => + new ReadThroughStore(ReadableStore.fromMap(m), + new ExceptionStore()) + } } diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala index 356a07b9..c3349dde 100644 --- a/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala @@ -32,5 +32,10 @@ object WriteThroughStoreProperties extends Properties("WriteThroughStoreProperti new WriteThroughStore(new ConcurrentHashMapStore[String,Int], new ConcurrentHashMapStore[String,Int], false) } -} + property("WriteThroughStore should ignore on the cache-store") = + storeTest { + new WriteThroughStore(new ConcurrentHashMapStore[String, Int], + new ExceptionStore(1.0f)) + } +}