diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala index 9202c756..1887036b 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala @@ -71,6 +71,14 @@ object MergeableStore { fc: FutureCollector[(K, Option[V])]): MergeableStore[K,V] = new MergeableStoreViaGetPut[K, V](store, fc) + /** Create a mergeable by implementing merge with single get followed by put for each key. Also forces multiGet and + * multiPut to use the store's default implementation of a single get and put. + * The merge is only safe if each key is owned by a single thread. Useful in certain cases where multiGets and + * multiPuts may result in higher error rates or lower throughput. + */ + def fromStoreNoMulti[K,V](store: Store[K,V])(implicit sg: Semigroup[V]): MergeableStore[K,V] = + new MergeableStoreViaSingleGetPut[K, V](store) + /** Create a mergeable by implementing merge with get followed by put. * Only safe if each key is owned by a single thread. * This deletes zeros on put, but returns zero on empty (never returns None). diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStoreViaGetPut.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStoreViaGetPut.scala index c9e6d92f..3557516a 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStoreViaGetPut.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStoreViaGetPut.scala @@ -25,14 +25,12 @@ import com.twitter.util.Future * writer thread per key. Otherwise you need to do some locking or compare-and-swap * in the store */ -class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCollector[(K, Option[V])] = FutureCollector.default[(K, Option[V])]) - extends MergeableStore[K, V] { + +class MergeableStoreViaSingleGetPut[-K, V: Semigroup](store: Store[K, V]) extends MergeableStore[K, V] { override def semigroup: Semigroup[V] = implicitly[Semigroup[V]] override def get(k: K) = store.get(k) - override def multiGet[K1 <: K](ks: Set[K1]) = store.multiGet(ks) override def put(kv: (K, Option[V])) = store.put(kv) - override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]) = store.multiPut(kvs) /** * sets to .plus(get(kv._1).get.getOrElse(monoid.zero), kv._2) @@ -44,6 +42,13 @@ class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCo newVOpt = vOpt.map(Semigroup.plus(_, kv._2)).orElse(Some(kv._2)) finalUnit <- put((kv._1, newVOpt)) } yield vOpt +} + +class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCollector[(K, Option[V])] = FutureCollector.default[(K, Option[V])]) + extends MergeableStoreViaSingleGetPut[K, V](store) { + + override def multiGet[K1 <: K](ks: Set[K1]) = store.multiGet(ks) + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]) = store.multiPut(kvs) override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = { implicit val collector = fc