Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for incorrect error message in MergeableStoreViaGetPut #334

Merged
merged 16 commits into from
Dec 19, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package com.twitter.storehaus.algebra

import com.twitter.algebird.{Monoid, Semigroup}
import com.twitter.bijection.ImplicitBijection
import com.twitter.storehaus.{FutureCollector, FutureOps, Store}
import com.twitter.util.{Future, Promise, Return, Throw, Try}
import com.twitter.storehaus.{FutureCollector, FutureOps, MissingValueException, Store}
import com.twitter.util.{Future, Promise, Return, Throw}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReferenceArray}
import scala.language.implicitConversions
import scala.reflect.ClassTag

/** Main trait to represent stores that are used for aggregation */
trait MergeableStore[-K, V] extends Store[K, V] with Mergeable[K, V]
Expand All @@ -32,6 +31,7 @@ object MergeableStore {
implicit def enrich[K, V](store: MergeableStore[K, V]): EnrichedMergeableStore[K, V] =
new EnrichedMergeableStore(store)

// todo(pankajg) After merging latest algebird replace this with Semigroup.maybePlus
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not use todo and instead file issues. We don't always close issues, but we approximately never address todos.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Not sure if filing a jira before the change is merged would be a good idea. Will file the jira after this change merges. This is a bit of problem with filing jiras in such cases, it is something that would need to be kept track separately.

private[this] def addOpt[V](init: Option[V], inc: V)(implicit sg: Semigroup[V]): Option[V] = init match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. the latest algebird has Semigroup.maybePlus this does this, but it always returns V not Option[V] which I think is correct (you can just box in Some as needed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Don't want to merge latest algebird in this change. Is it ok to simplify this separate review where I can merge algebird. I've left a todo for now.

case Some(i) => Some(sg.plus(i, inc))
case None => Some(inc)
Expand All @@ -57,66 +57,73 @@ object MergeableStore {
val (collectedMGetResult, getFailures) = collectWithFailures(mGetResult)

val mPutResultFut: Future[Map[K, Future[Option[V]]]] =
collectedMGetResult.map { pairs: Seq[(K, (Option[V], Option[V]))] =>
collectedMGetResult.map { pairs: Array[(K, (Option[V], Option[V]))] =>
val pairMap = pairs.toMap
val mPutResult: Map[K, Future[Unit]] = store.multiPut(pairMap.mapValues(_._2))
mPutResult.map { case (k, funit) =>
(k, funit.map { _ => pairMap(k)._1 })
}
}

val missingFn: K => Future[Option[V]] = { k =>
getFailures.flatMap { failures =>
Future.exception(failures.getOrElse(k, new MissingValueException[K](k)))
}
}

/**
* Combine original keys with result after put.
* Combine original keys with result after put and errors with get.
*/
FutureOps.liftFutureValues(keySet, mPutResultFut, getFailures)
FutureOps.liftFutureValues(keySet, mPutResultFut, missingFn)
}

/**
* Collects keyed futures, partitioning out the failures.
*/
private[algebra] def collectWithFailures[K, V](fs:Map[K, Future[V]]): (Future[Seq[(K, V)]], Map[K, Future[Nothing]]) = {
private[algebra] def collectWithFailures[K, V](fs: Map[K, Future[V]]): (Future[Array[(K, V)]], Future[Map[K, Throwable]]) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at this again it seems like rather than Either[Throwable, V] we could just use Try[V] which is basically the same.

Then this method signature could be:
def collectWithFailures[K, V](fs: Map[K, Future[V]]): Future[Map[K, Try[V]]] which actually seems useful enough to have even in twitter util.

In any case, the first thing you do when you get the result is call .toMap on the array, so returning the Map seems to make sense.

Also, both of these futures are satisfied at the same time, right? So, even if you don't merge the data structures, you could do: Future[(Map[K, V], Map[K, Throwable])] so it is clear that all the data comes in at one time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of Either[Throwable, V] to Try[V] is great, indeed Try is basically the same in this context and simpler.

Combining successful and failed results will be inefficient since we need to immediately do multiput for the successful ones and so need them separately. By passing them both back in a single future we have to split them up again immediately after. I do agree that returning a single future conveys the behavior more correctly. It feels like a question of performance and code simplicity vs indication of correct intent. I've implemented the return with single future PTAL.

if (fs.isEmpty) {
(Future.value(Seq.empty[(K, V)]), Map.empty[K, Future[Nothing]])
(Future.value(Array.empty[(K, V)]), Future.value(Map.empty[K, Throwable]))
} else {
val fsSize = fs.size
val results = new AtomicReferenceArray[Either[(K,Future[Nothing]), (K, V)]](fsSize)
val results = new AtomicReferenceArray[(K, Either[Throwable, V])](fsSize)
val countdown = new AtomicInteger(fsSize)
val successCount = new AtomicInteger(0)
val pSuccess = new Promise[Seq[(K, V)]]
var failures = Map.empty[K, Future[Nothing]]
val pSuccess = new Promise[Array[(K, V)]]
val pFailures = new Promise[Map[K, Throwable]]

def collectResults() = {
if (countdown.decrementAndGet() == 0) {
val successArray = new Array[(K, V)](successCount.get())
var si = 0
var ri = 0
var failures = Map.empty[K, Throwable]
while (ri < fsSize) {
results.get(ri) match {
case Right(kv) =>
successArray(si) = kv
case (k, Right(v)) =>
successArray(si) = k -> v
si += 1
case Left(kv) =>
failures = failures + kv
case (k, Left(t)) =>
failures = failures + (k -> t)
}
ri += 1
}
pSuccess.setValue(successArray)
pFailures.setValue(failures)
}
}

for (((k, f), i) <- fs.iterator.zipWithIndex) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are going to geek out on while loops above, doing the same here is probably worth it:

val mapIt = fs.iterator
var i = 0
while(mapIt.hasNext) {
  val kv = mapIt.next
  fv._2.respond {
    case Return(v) =>...
  }
  i += 0
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this, it does add a bit of complexity. Also does avoid one wrapping/unwraping for each item in the map.

f respond {
case Return(v) =>
results.set(i, Right(k -> v))
results.set(i, k -> Right(v))
successCount.incrementAndGet()
collectResults()
case t@Throw(cause) =>
val failure = k -> Future.const(Throw(cause))
results.set(i, Left(failure))
results.set(i, k -> Left(cause))
collectResults()
}
}
(pSuccess, failures)
(pSuccess, pFailures)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package com.twitter.storehaus.algebra

import com.twitter.algebird.{ MapAlgebra, Semigroup, Monoid }
import com.twitter.algebird.{MapAlgebra, Monoid, Semigroup}
import com.twitter.bijection.Injection
import com.twitter.storehaus._
import com.twitter.util.{Await, Future}
import org.scalacheck.{Prop, Arbitrary, Properties}
import com.twitter.util.{Await, Duration, Future}
import org.scalacheck.{Arbitrary, Prop, Properties}
import org.scalacheck.Prop._

import scala.collection.breakOut

object MergeableStoreProperties extends Properties("MergeableStore") {
Expand All @@ -41,6 +40,8 @@ object MergeableStoreProperties extends Properties("MergeableStore") {
}
}

implicit val timer = new com.twitter.util.ScheduledThreadPoolTimer(makeDaemons = true)

implicit def mapEquiv[K, V: Monoid: Equiv]: Equiv[Map[K, V]] = {
Equiv.fromFunction { (m1, m2) =>
val cleanM1 = MapAlgebra.removeZeros(m1)
Expand Down Expand Up @@ -221,4 +222,25 @@ object MergeableStoreProperties extends Properties("MergeableStore") {
ins.forall { case (k, v) => Await.result(result(k)) == Some(v) }
}
}

property("collectWithFailures should collect successes correctly") = {
forAll { ins: Map[Int, Int] =>
val fSleep = Future.sleep(Duration.fromMilliseconds(10))
val futures = ins.mapValues(x => fSleep.map(_ => x))
val (fSuccess, _) = MergeableStore.collectWithFailures(futures)
val ss = Await.result(fSuccess)
ss.size == futures.size && ss.toSeq.sorted == ins.toSeq.sorted
}
}

property("collectWithFailures should collect failures correctly") = {
forAll { ins: Map[Int, Int] =>
val throwable = new RuntimeException
val fSleep = Future.sleep(Duration.fromMilliseconds(10))
val futures = ins.mapValues(_ => fSleep before Future.exception(throwable))
val (_, fFailures) = MergeableStore.collectWithFailures(futures)
val fails = Await.result(fFailures)
fails.size == futures.size && fails.map(_._2).forall(_ == throwable)
}
}
}