Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for incorrect error message in MergeableStoreViaGetPut #334

Merged
merged 16 commits into from
Dec 19, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package com.twitter.storehaus.algebra

import com.twitter.algebird.{Semigroup, Monoid}
import com.twitter.algebird.{Monoid, Semigroup}
import com.twitter.bijection.ImplicitBijection
import com.twitter.storehaus.{FutureOps, FutureCollector, Store}
import com.twitter.util.Future

import com.twitter.storehaus.{FutureCollector, FutureOps, Store}
import com.twitter.util.{Future, Promise, Return, Throw, Try}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReferenceArray}
import scala.language.implicitConversions
import scala.reflect.ClassTag

/** Main trait to represent stores that are used for aggregation */
trait MergeableStore[-K, V] extends Store[K, V] with Mergeable[K, V]
Expand All @@ -31,25 +32,29 @@ object MergeableStore {
implicit def enrich[K, V](store: MergeableStore[K, V]): EnrichedMergeableStore[K, V] =
new EnrichedMergeableStore(store)

private[this] def addOpt[V](init: Option[V], inc: V)(implicit sg: Semigroup[V]): Option[V] = init match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. the latest algebird has Semigroup.maybePlus this does this, but it always returns V not Option[V] which I think is correct (you can just box in Some as needed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Don't want to merge latest algebird in this change. Is it ok to simplify this separate review where I can merge algebird. I've left a todo for now.

case Some(i) => Some(sg.plus(i, inc))
case None => Some(inc)
}

/**
* Implements multiMerge functionality in terms of an underlying
* store's multiGet and multiSet.
*/
def multiMergeFromMultiSet[K, V](store: Store[K, V], kvs: Map[K, V],
missingfn: (K) => Future[Option[V]] = FutureOps.missingValueFor _)
(implicit collect: FutureCollector, sg: Semigroup[V]): Map[K, Future[Option[V]]] = {
def multiMergeFromMultiSet[K, V](store: Store[K, V], kvs: Map[K, V])
(implicit sg: Semigroup[V]): Map[K, Future[Option[V]]] = {
val keySet = kvs.keySet
val mGetResult: Seq[Future[(K, (Option[V], Option[V]))]] =
store.multiGet(keySet).iterator.map {
case (k, futureOptV) =>
futureOptV.map { init =>
val incV = kvs(k)
val resV = init.map(Semigroup.plus(_, incV)).orElse(Some(incV))
k -> ((init, resV))
}
}.toIndexedSeq
val mGetResult: Map[K, Future[(Option[V], Option[V])]] =
store.multiGet(keySet).map { case (k, futureOptV) =>
val newFOptV = futureOptV.map { oldV: Option[V] =>
val incV = kvs(k)
val newV = addOpt(oldV, incV)
(oldV, newV)
}
k -> newFOptV
}

val collectedMGetResult: Future[Seq[(K, (Option[V], Option[V]))]] = collect { mGetResult }
val (collectedMGetResult, getFailures) = collectWithFailures(mGetResult)

val mPutResultFut: Future[Map[K, Future[Option[V]]]] =
collectedMGetResult.map { pairs: Seq[(K, (Option[V], Option[V]))] =>
Expand All @@ -61,10 +66,58 @@ object MergeableStore {
}

/**
* Combine original keys with result after put. Some keys might have dropped,
* fill those using missingfn.
* Combine original keys with result after put.
*/
FutureOps.liftFutureValues(keySet, mPutResultFut, missingfn)
FutureOps.liftFutureValues(keySet, mPutResultFut, getFailures)
}

/**
* Collects keyed futures, partitioning out the failures.
*/
private[algebra] def collectWithFailures[K, V](fs:Map[K, Future[V]]): (Future[Seq[(K, V)]], Map[K, Future[Nothing]]) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put spaces in types: fs: Map...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not crazy about using Seq. I'd rather use List or Vector so the contracts are clearer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is to be private, it can even be Array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with Array.

if (fs.isEmpty) {
(Future.value(Seq.empty[(K, V)]), Map.empty[K, Future[Nothing]])
} else {
val fsSize = fs.size
val results = new AtomicReferenceArray[Either[(K,Future[Nothing]), (K, V)]](fsSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about AtomicReferenceArray[(K, Either[Throwable, V])] here instead. Seems easier to follow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val countdown = new AtomicInteger(fsSize)
val successCount = new AtomicInteger(0)
val pSuccess = new Promise[Seq[(K, V)]]
var failures = Map.empty[K, Future[Nothing]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this need to be an AtomicReference? How is it safe to modify this var from many threads?


def collectResults() = {
if (countdown.decrementAndGet() == 0) {
val successArray = new Array[(K, V)](successCount.get())
var si = 0
var ri = 0
while (ri < fsSize) {
results.get(ri) match {
case Right(kv) =>
successArray(si) = kv
si += 1
case Left(kv) =>
failures = failures + kv
}
ri += 1
}
pSuccess.setValue(successArray)
}
}

for (((k, f), i) <- fs.iterator.zipWithIndex) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are going to geek out on while loops above, doing the same here is probably worth it:

val mapIt = fs.iterator
var i = 0
while(mapIt.hasNext) {
  val kv = mapIt.next
  fv._2.respond {
    case Return(v) =>...
  }
  i += 0
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this, it does add a bit of complexity. Also does avoid one wrapping/unwraping for each item in the map.

f respond {
case Return(v) =>
results.set(i, Right(k -> v))
successCount.incrementAndGet()
collectResults()
case t@Throw(cause) =>
val failure = k -> Future.const(Throw(cause))
results.set(i, Left(failure))
collectResults()
}
}
(pSuccess, failures)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failures has to be a future right? You are returning a copy here, but the futures are not done yet. If you could have a Map you could see what keys have failed, and you can't can you? I think this has to be Future[Map[K, ..]]. I am not sure Future[Nothing] is a better type that Throwable though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was totally wrong. I've fixed it now and added tests as well.

}
}

/** unpivot or uncurry this MergeableStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ class MergeableStoreViaGetPut[-K, V: Semigroup](
store.multiPut(kvs)

override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = {
MergeableStore.multiMergeFromMultiSet(this, kvs)(fc, semigroup)
MergeableStore.multiMergeFromMultiSet(this, kvs)(semigroup)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,26 +190,24 @@ object MergeableStoreProperties extends Properties("MergeableStore") {
override def put(kv: (K, Option[V])): Future[Unit] = Future.Unit
}

property("multiMergeFromMultiSet handles store failures correctly " +
"with bestEffort FutureCollector") = {
property("multiMergeFromMultiSet handles store failures correctly") = {
val intSg = implicitly[Semigroup[Int]]
val storeExceptionFut = Future.exception[Option[Int]](new RuntimeException("Failure in store"))
val storeException = new RuntimeException("Failure in store")
val storeExceptionFut = Future.exception[Option[Int]](storeException)
val missingException = new RuntimeException("Missing value")
val missingExceptionFut = Future.exception[Option[Int]](missingException)
val missingfn: Int => Future[Option[Int]] = _ => missingExceptionFut

forAll { (ins: Map[Int, Int]) =>
val result = MergeableStore.multiMergeFromMultiSet(
new MockStore[Int, Int](storeExceptionFut), ins, missingfn)(
FutureCollector.bestEffort, intSg)
new MockStore[Int, Int](storeExceptionFut), ins)(intSg)
result.values.forall { v =>
Await.result(v.liftToTry).throwable == missingException
Await.result(v.liftToTry).throwable == storeException
}
}
}

property("multiMergeFromMultiSet returns previously stored value correctly " +
"with bestEffort FutureCollector") = {
property("multiMergeFromMultiSet returns previously stored value correctly") = {
val intSg = implicitly[Semigroup[Int]]
forAll { (ins: Map[Int, Int]) =>
val store = newStore[Int, Int]
Expand All @@ -218,11 +216,9 @@ object MergeableStoreProperties extends Properties("MergeableStore") {
* We merge the entire input map twice into the store. It should return values
* stored the first time in the second call as old values.
*/
MergeableStore.multiMergeFromMultiSet(store, ins)(FutureCollector.bestEffort, intSg)
val result = MergeableStore
.multiMergeFromMultiSet(store, ins)(FutureCollector.bestEffort, intSg)
MergeableStore.multiMergeFromMultiSet(store, ins)(intSg)
val result = MergeableStore.multiMergeFromMultiSet(store, ins)(intSg)
ins.forall { case (k, v) => Await.result(result(k)) == Some(v) }
}

}
}