Skip to content

Commit

Permalink
Merge the RetryingRWStore to RetryingStore.
Browse files Browse the repository at this point in the history
  • Loading branch information
Guanglei Song committed May 27, 2014
1 parent b08a7c3 commit f428735
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 73 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.twitter.storehaus

import com.twitter.conversions.time._
import com.twitter.util.{ Duration, Future, Return, Throw, Timer }

/**
Expand Down Expand Up @@ -52,6 +53,35 @@ class RetryingReadableStore[-K, +V](store: ReadableStore[K, V], backoffs: Iterab
class RetryingStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration])(pred: Option[V] => Boolean)(implicit timer: Timer)
extends RetryingReadableStore[K, V](store, backoffs)(pred)
with Store[K, V] {
override def put(kv: (K, Option[V])) = store.put(kv)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = store.multiPut(kvs)

private val padded_backoffs = backoffs ++ Seq(0.second)

private def find[T](futures: Iterator[(Future[T], Duration)])(pred: T => Boolean): Future[T] = {
if (!futures.hasNext) {
Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find"))
} else {
val (next, delay) = futures.next()
if (!futures.hasNext) {
next
} else {
next.filter(pred).rescue {
case e: Exception =>
timer.doLater(delay)(()) flatMap { _ =>
find(futures)(pred)
}
}
}
}
}

override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = {
store.multiPut(kvs) map { case (k, future) =>
val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))})
.zip(padded_backoffs.iterator)
(k, find(retryStream) { t => true })
}
}

override def put(kv: (K, Option[V])) = multiPut(Map(kv)).apply(kv._1)
}

0 comments on commit f428735

Please sign in to comment.