Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MergeableStore.fromStoreNoMulti that does single get then put #201

Merged
merged 3 commits into from
Dec 18, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ object MergeableStore {
fc: FutureCollector[(K, Option[V])]): MergeableStore[K,V] =
new MergeableStoreViaGetPut[K, V](store, fc)

/** Create a mergeable by implementing merge with single get followed by put for each key. Also forces multiGet and
* multiPut to use the store's default implementation of a single get and put.
* The merge is only safe if each key is owned by a single thread. Useful in certain cases where multiGets and
* multiPuts may result in higher error rates or lower throughput.
*/
def fromStoreNoMulti[K,V](store: Store[K,V])(implicit sg: Semigroup[V]): MergeableStore[K,V] =
new MergeableStoreViaSingleGetPut[K, V](store)

/** Create a mergeable by implementing merge with get followed by put.
* Only safe if each key is owned by a single thread.
* This deletes zeros on put, but returns zero on empty (never returns None).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ import com.twitter.util.Future
* writer thread per key. Otherwise you need to do some locking or compare-and-swap
* in the store
*/
class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCollector[(K, Option[V])] = FutureCollector.default[(K, Option[V])])
extends MergeableStore[K, V] {

class MergeableStoreViaSingleGetPut[-K, V: Semigroup](store: Store[K, V]) extends MergeableStore[K, V] {
override def semigroup: Semigroup[V] = implicitly[Semigroup[V]]

override def get(k: K) = store.get(k)
override def multiGet[K1 <: K](ks: Set[K1]) = store.multiGet(ks)
override def put(kv: (K, Option[V])) = store.put(kv)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]) = store.multiPut(kvs)

/**
* sets to .plus(get(kv._1).get.getOrElse(monoid.zero), kv._2)
Expand All @@ -44,6 +42,13 @@ class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCo
newVOpt = vOpt.map(Semigroup.plus(_, kv._2)).orElse(Some(kv._2))
finalUnit <- put((kv._1, newVOpt))
} yield vOpt
}

class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCollector[(K, Option[V])] = FutureCollector.default[(K, Option[V])])
extends MergeableStoreViaSingleGetPut[K, V](store) {

override def multiGet[K1 <: K](ks: Set[K1]) = store.multiGet(ks)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]) = store.multiPut(kvs)

override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = {
implicit val collector = fc
Expand Down