Skip to content

Commit

Permalink
Merge pull request #230 from luckysong/develop
Browse files Browse the repository at this point in the history
Share the Retrying Read Write store in storehaus repo
  • Loading branch information
johnynek committed May 30, 2014
2 parents aaeab18 + f428735 commit 5f6d135
Showing 1 changed file with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.twitter.storehaus

import com.twitter.conversions.time._
import com.twitter.util.{ Duration, Future, Return, Throw, Timer }

/**
Expand Down Expand Up @@ -52,6 +53,35 @@ class RetryingReadableStore[-K, +V](store: ReadableStore[K, V], backoffs: Iterab
class RetryingStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration])(pred: Option[V] => Boolean)(implicit timer: Timer)
extends RetryingReadableStore[K, V](store, backoffs)(pred)
with Store[K, V] {
override def put(kv: (K, Option[V])) = store.put(kv)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = store.multiPut(kvs)

private val padded_backoffs = backoffs ++ Seq(0.second)

private def find[T](futures: Iterator[(Future[T], Duration)])(pred: T => Boolean): Future[T] = {
if (!futures.hasNext) {
Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find"))
} else {
val (next, delay) = futures.next()
if (!futures.hasNext) {
next
} else {
next.filter(pred).rescue {
case e: Exception =>
timer.doLater(delay)(()) flatMap { _ =>
find(futures)(pred)
}
}
}
}
}

override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = {
store.multiPut(kvs) map { case (k, future) =>
val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))})
.zip(padded_backoffs.iterator)
(k, find(retryStream) { t => true })
}
}

override def put(kv: (K, Option[V])) = multiPut(Map(kv)).apply(kv._1)
}

0 comments on commit 5f6d135

Please sign in to comment.