diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala index 6a941e45..920df147 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala @@ -16,6 +16,7 @@ package com.twitter.storehaus +import com.twitter.conversions.time._ import com.twitter.util.{ Duration, Future, Return, Throw, Timer } /** @@ -52,6 +53,35 @@ class RetryingReadableStore[-K, +V](store: ReadableStore[K, V], backoffs: Iterab class RetryingStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration])(pred: Option[V] => Boolean)(implicit timer: Timer) extends RetryingReadableStore[K, V](store, backoffs)(pred) with Store[K, V] { - override def put(kv: (K, Option[V])) = store.put(kv) - override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = store.multiPut(kvs) + + private val padded_backoffs = backoffs ++ Seq(0.second) + + private def find[T](futures: Iterator[(Future[T], Duration)])(pred: T => Boolean): Future[T] = { + if (!futures.hasNext) { + Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find")) + } else { + val (next, delay) = futures.next() + if (!futures.hasNext) { + next + } else { + next.filter(pred).rescue { + case e: Exception => + timer.doLater(delay)(()) flatMap { _ => + find(futures)(pred) + } + } + } + } + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { + store.multiPut(kvs) map { case (k, future) => + val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))}) + .zip(padded_backoffs.iterator) + (k, find(retryStream) { t => true }) + } + } + + override def put(kv: (K, Option[V])) = multiPut(Map(kv)).apply(kv._1) } +