-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stores for read-through and write-through caching #220
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright 2014 Twitter Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. You may obtain | ||
* a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.twitter.storehaus | ||
|
||
import com.twitter.concurrent.AsyncMutex | ||
import com.twitter.util.Future | ||
|
||
/** | ||
* Provides read-through caching on a readable store fronted by a cache. | ||
* | ||
* Keys are fetched from backing store on cache miss and cache read failures. | ||
* | ||
* All cache operations are best effort i.e. 'get' will return the key from | ||
* backing store even if adding/updating the cached copy fails. | ||
* | ||
* On the other hand, any failure while reading from backing store | ||
* is propagated to the client. | ||
* | ||
* Thread-safety is achieved using a mutex. | ||
* | ||
* @author Ruban Monu | ||
*/ | ||
class ReadThroughStore[K, V](backingStore: ReadableStore[K, V], cache: Store[K, V]) | ||
extends ReadableStore[K, V] { | ||
|
||
protected [this] lazy val mutex = new AsyncMutex | ||
|
||
private [this] def getFromBackingStore(k: K) : Future[Option[V]] = { | ||
// attempt to fetch the key from backing store and | ||
// write the key to cache, best effort | ||
backingStore.get(k).flatMap { storeValue => | ||
mutex.acquire.flatMap { p => | ||
cache.put((k, storeValue)) | ||
.map { u : Unit => storeValue } | ||
.onFailure { case x: Exception => storeValue } | ||
.ensure { p.release } | ||
} | ||
} | ||
} | ||
|
||
override def get(k: K): Future[Option[V]] = | ||
cache.get(k).flatMap { cacheValue => | ||
cacheValue match { | ||
case None => getFromBackingStore(k) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. erm.. I think there is a bug in this code now. Imagine two threads coming one after the other in a cache miss scenario. First thread will do a cache.get, acquire the mutex and do a get on the backing store. In the mean time second thread will do a cache.get, get a miss and block on getFromBackingStore till the first thread is done. Then it will acquire the lock, do a get on the backingstore again and repopulate the cache. I think you may need to do a cache get inside the getFromBackingStore function, just to check that cache is not already populated by the previous thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do see a case where we can have threads queued up trying to get a hot key on cache miss. Also a cache get is probably going to be less expensive than getting from backing store again. But, is this expected to happen fairly frequently? Just wondering if we need to have an additional cache get in the read path each time to account for this case. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think an additional cache get would be fine There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thought some more about this. Looks like we'll need to place the cache get inside the mutex block if we want this behavior. The current code doesn't break any store semantics as the latest value in backing store is written to cache. So we can either keep the existing behavior and revisit if this turns out to be a perf issue, or, make the mutex block larger. I propose we do the former. @MansurAshraf @johnynek thoughts? |
||
case some => Future.value(some) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This maybe premature optimization so feel free to ignore but it would be sweet if we only use mutex in scenario where there is a cache miss and we have to go to the backing store. In case of a cache hit there is no need to lock as its just a read only operation. I feel like cache hit will be the most common scenario and we should try to make it lock free if we can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. In database terms, this would give us "read committed" isolation level, which should be good enough in this case I think. |
||
} | ||
} onFailure { case x: Exception => | ||
getFromBackingStore(k) | ||
} | ||
|
||
override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { | ||
// attempt to read from cache first | ||
val cacheResults : Map[K1, Future[Either[Option[V], Exception]]] = | ||
cache.multiGet(ks).map { case (k, f) => | ||
(k, f.map { optv => Left(optv) } onFailure { case x: Exception => Right(x) }) | ||
} | ||
|
||
// attempt to read all failed keys and cache misses from backing store | ||
val f: Future[Map[K1, Option[V]]] = | ||
FutureOps.mapCollect(cacheResults).flatMap { cacheResult => | ||
val failedKeys = cacheResult.filter { _._2.isRight }.keySet | ||
val responses = cacheResult.filter { _._2.isLeft }.map { case (k, r) => (k, r.left.get) } | ||
val hits = responses.filter { !_._2.isEmpty } | ||
val missedKeys = responses.filter { _._2.isEmpty }.keySet | ||
|
||
FutureOps.mapCollect(backingStore.multiGet(missedKeys ++ failedKeys)).flatMap { storeResult => | ||
// write fetched keys to cache, best effort | ||
mutex.acquire.flatMap { p => | ||
FutureOps.mapCollect(cache.multiPut(storeResult))(FutureCollector.bestEffort[(K1, Unit)]) | ||
.map { u => hits ++ storeResult } | ||
.ensure { p.release } | ||
} | ||
} | ||
} | ||
FutureOps.liftValues(ks, f, { (k: K1) => Future.None }) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Copyright 2014 Twitter Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. You may obtain | ||
* a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.twitter.storehaus | ||
|
||
import com.twitter.util.Future | ||
|
||
/** | ||
* Provides write-through caching on a store fronted by a cache. | ||
* | ||
* All cache operations are best effort. i.e. a write will succeed if the key | ||
* was written to backing store even when adding/updating the cached copy of the key fails. | ||
* | ||
* On the other hand, any failure while writing to backing store | ||
* is propagated to the client. | ||
* | ||
* If invalidate flag is set, any keys that fail to write to backing store | ||
* are attempted to be removed from cache, rather than having the old value | ||
* still in cache. | ||
* | ||
* Thread-safety is achieved using a mutex. | ||
* | ||
* @author Ruban Monu | ||
*/ | ||
class WriteThroughStore[K, V](backingStore: Store[K, V], cache: Store[K, V], invalidate: Boolean = true) | ||
extends ReadThroughStore[K, V](backingStore, cache) with Store[K, V] { | ||
|
||
override def put(kv: (K, Option[V])): Future[Unit] = mutex.acquire.flatMap { p => | ||
// write key to backing store first | ||
backingStore.put(kv).flatMap { u: Unit => | ||
// now write key to cache, best effort | ||
cache.put(kv) onFailure { case x: Exception => u } | ||
} onFailure { case x: Exception => | ||
// write to backing store failed | ||
// now optionally invalidate the key in cache, best effort | ||
if (invalidate) { | ||
cache.put((kv._1, None)).flatMap { u: Unit => throw x } onFailure { throw x } | ||
} else { | ||
throw x | ||
} | ||
} ensure { | ||
p.release | ||
} | ||
} | ||
|
||
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { | ||
val f : Future[Map[K1, Either[Unit, Exception]]] = mutex.acquire.flatMap { p => | ||
// write keys to backing store first | ||
val storeResults : Map[K1, Future[Either[Unit, Exception]]] = | ||
backingStore.multiPut(kvs).map { case (k, f) => | ||
(k, f.map { u: Unit => Left(u) }.onFailure { case x: Exception => Right(x) }) | ||
} | ||
|
||
// perform cache operations based on how writes to backing store go | ||
FutureOps.mapCollect(storeResults).flatMap { storeResult : Map[K1, Either[Unit, Exception]] => | ||
val failedKeys = storeResult.filter { _._2.isRight }.keySet | ||
val succeededKeys = storeResult.filter { _._2.isLeft }.keySet | ||
|
||
// write updated keys to cache, best effort | ||
val keysToUpdate = kvs.filterKeys { succeededKeys.contains(_) } ++ { | ||
// optionally invalidate cached copy when write to backing store fails | ||
if (invalidate) { failedKeys.map { k => (k, None) }.toMap } | ||
else { Map.empty } | ||
} | ||
|
||
FutureOps.mapCollect(cache.multiPut(keysToUpdate))(FutureCollector.bestEffort[(K1, Unit)]) | ||
.map { f => storeResult } | ||
// return original writes made to backing store | ||
// once cache operations are complete | ||
} ensure { | ||
p.release | ||
} | ||
} | ||
|
||
// throw original exception for any writes that failed | ||
FutureOps.liftValues(kvs.keySet, f, { (k: K1) => Future.None }) | ||
.map { case kv : (K1, Future[Either[Unit, Exception]]) => | ||
val transform = kv._2.map { v => | ||
v match { | ||
case Left(optv) => optv | ||
case Right(x) => throw x | ||
} | ||
} | ||
(kv._1, transform) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2014 Twitter Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. You may obtain | ||
* a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.twitter.storehaus | ||
|
||
import org.scalacheck.Properties | ||
import org.scalacheck.Prop._ | ||
|
||
object ReadThroughStoreProperties extends Properties("ReadThroughStoreProperties") { | ||
import ReadableStoreProperties.readableStoreLaws | ||
|
||
property("ReadThroughStore obeys the ReadableStore laws") = | ||
readableStoreLaws[String, Int] { m => | ||
new ReadThroughStore(ReadableStore.fromMap(m), new ConcurrentHashMapStore[String,Int]) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Copyright 2014 Twitter Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. You may obtain | ||
* a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.twitter.storehaus | ||
|
||
import org.scalacheck.Properties | ||
|
||
object WriteThroughStoreProperties extends Properties("WriteThroughStoreProperties") { | ||
import StoreProperties.storeTest | ||
|
||
property("WriteThroughStore with invalidation obeys the Store laws") = | ||
storeTest { | ||
new WriteThroughStore(new ConcurrentHashMapStore[String,Int], | ||
new ConcurrentHashMapStore[String,Int], true) | ||
} | ||
|
||
property("WriteThroughStore with no invalidation obeys the Store laws") = | ||
storeTest { | ||
new WriteThroughStore(new ConcurrentHashMapStore[String,Int], | ||
new ConcurrentHashMapStore[String,Int], false) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the cache fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should fetch the key from backing store, as you suggested as well. So treating a cache miss and cache fail the same.