From f1a16f6431e682d35c9dcdfeb5c7585a3a2565f6 Mon Sep 17 00:00:00 2001 From: Guanglei Song Date: Thu, 17 Apr 2014 13:49:35 -0700 Subject: [PATCH 1/3] Move RetryingRWStore from twitter internal to external storehaus. --- .../twitter/storehaus/RetryingRWStore.scala | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala new file mode 100644 index 00000000..30239bfb --- /dev/null +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus + +import com.twitter.conversions.time._ +import com.twitter.storehaus.{ Store, StoreProxy } +import com.twitter.finagle.stats.{ Counter, NullStatsReceiver, StatsReceiver } +import com.twitter.util._ + +/** + * Read/Write retrying Store. + * Make this to be shared by all storehaus users. + */ +class RetryingRWStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration], + statsReceiver: StatsReceiver = NullStatsReceiver)(implicit timer: Timer) + extends Store[K, V] { + + private lazy val stats = new { + val receiver = statsReceiver.scope("retry_store") + + val multiGetExceptions = receiver.counter("multi_get_exceptions") + val multiPutExceptions = receiver.counter("multi_put_exceptions") + } + + private val padded_backoffs = backoffs ++ Seq(0.second) + + private def find[T](futures: Iterator[(Future[T], Duration)], + errorCounter: Counter)(pred: T => Boolean): Future[T] = { + if (!futures.hasNext) { + Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find")) + } else { + val (next, delay) = futures.next() + if (!futures.hasNext) { + next + } else { + next.filter(pred).rescue { + case e: Exception => + errorCounter.incr() + timer.doLater(delay)(()) flatMap { _ => + find(futures, errorCounter)(pred) + } + } + } + } + } + + /** + * First do a multiPut on the store. For the futures that fail, chain retry puts after delay. + */ + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { + store.multiPut(kvs) map { case (k, future) => + val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))}) + .zip(padded_backoffs.iterator) + (k, find(retryStream, stats.multiPutExceptions) { t => true }) + } + } + + /** + * First do a multiGet on the store. For the futures that fail, chain retry gets after delay. + */ + override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { + store.multiGet(ks) map { case (k, future) => + val retryStream = (Iterator(future) ++ Iterator.continually { store.get(k) }) + .zip(padded_backoffs.iterator) + (k, find(retryStream, stats.multiGetExceptions) { t => true }) + } + } +} From b08a7c33a7c4d7a10345ff5c98b0d1bd7d920f41 Mon Sep 17 00:00:00 2001 From: Guanglei Song Date: Thu, 17 Apr 2014 13:59:08 -0700 Subject: [PATCH 2/3] Share RetryingRWStore to all storehaus users. --- .../twitter/storehaus/RetryingRWStore.scala | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala index 30239bfb..315125ce 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala @@ -17,29 +17,19 @@ package com.twitter.storehaus import com.twitter.conversions.time._ -import com.twitter.storehaus.{ Store, StoreProxy } -import com.twitter.finagle.stats.{ Counter, NullStatsReceiver, StatsReceiver } +import com.twitter.storehaus.Store import com.twitter.util._ /** * Read/Write retrying Store. - * Make this to be shared by all storehaus users. + * Make this available to all storehaus users. */ -class RetryingRWStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration], - statsReceiver: StatsReceiver = NullStatsReceiver)(implicit timer: Timer) +class RetryingRWStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration])(implicit timer: Timer) extends Store[K, V] { - private lazy val stats = new { - val receiver = statsReceiver.scope("retry_store") - - val multiGetExceptions = receiver.counter("multi_get_exceptions") - val multiPutExceptions = receiver.counter("multi_put_exceptions") - } - private val padded_backoffs = backoffs ++ Seq(0.second) - private def find[T](futures: Iterator[(Future[T], Duration)], - errorCounter: Counter)(pred: T => Boolean): Future[T] = { + private def find[T](futures: Iterator[(Future[T], Duration)])(pred: T => Boolean): Future[T] = { if (!futures.hasNext) { Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find")) } else { @@ -49,9 +39,8 @@ class RetryingRWStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration], } else { next.filter(pred).rescue { case e: Exception => - errorCounter.incr() timer.doLater(delay)(()) flatMap { _ => - find(futures, errorCounter)(pred) + find(futures)(pred) } } } @@ -65,7 +54,7 @@ class RetryingRWStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration], store.multiPut(kvs) map { case (k, future) => val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))}) .zip(padded_backoffs.iterator) - (k, find(retryStream, stats.multiPutExceptions) { t => true }) + (k, find(retryStream) { t => true }) } } @@ -76,7 +65,7 @@ class RetryingRWStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration], store.multiGet(ks) map { case (k, future) => val retryStream = (Iterator(future) ++ Iterator.continually { store.get(k) }) .zip(padded_backoffs.iterator) - (k, find(retryStream, stats.multiGetExceptions) { t => true }) + (k, find(retryStream) { t => true }) } } } From f428735aef6a8caa9367d5d476ade2405771c224 Mon Sep 17 00:00:00 2001 From: Guanglei Song Date: Mon, 26 May 2014 23:27:16 -0700 Subject: [PATCH 3/3] Merge the RetryingRWStore to RetryingStore. --- .../twitter/storehaus/RetryingRWStore.scala | 71 ------------------- .../com/twitter/storehaus/RetryingStore.scala | 34 ++++++++- 2 files changed, 32 insertions(+), 73 deletions(-) delete mode 100644 storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala deleted file mode 100644 index 315125ce..00000000 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingRWStore.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2014 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus - -import com.twitter.conversions.time._ -import com.twitter.storehaus.Store -import com.twitter.util._ - -/** - * Read/Write retrying Store. - * Make this available to all storehaus users. - */ -class RetryingRWStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration])(implicit timer: Timer) - extends Store[K, V] { - - private val padded_backoffs = backoffs ++ Seq(0.second) - - private def find[T](futures: Iterator[(Future[T], Duration)])(pred: T => Boolean): Future[T] = { - if (!futures.hasNext) { - Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find")) - } else { - val (next, delay) = futures.next() - if (!futures.hasNext) { - next - } else { - next.filter(pred).rescue { - case e: Exception => - timer.doLater(delay)(()) flatMap { _ => - find(futures)(pred) - } - } - } - } - } - - /** - * First do a multiPut on the store. For the futures that fail, chain retry puts after delay. - */ - override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { - store.multiPut(kvs) map { case (k, future) => - val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))}) - .zip(padded_backoffs.iterator) - (k, find(retryStream) { t => true }) - } - } - - /** - * First do a multiGet on the store. For the futures that fail, chain retry gets after delay. - */ - override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { - store.multiGet(ks) map { case (k, future) => - val retryStream = (Iterator(future) ++ Iterator.continually { store.get(k) }) - .zip(padded_backoffs.iterator) - (k, find(retryStream) { t => true }) - } - } -} diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala index 6a941e45..920df147 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala @@ -16,6 +16,7 @@ package com.twitter.storehaus +import com.twitter.conversions.time._ import com.twitter.util.{ Duration, Future, Return, Throw, Timer } /** @@ -52,6 +53,35 @@ class RetryingReadableStore[-K, +V](store: ReadableStore[K, V], backoffs: Iterab class RetryingStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration])(pred: Option[V] => Boolean)(implicit timer: Timer) extends RetryingReadableStore[K, V](store, backoffs)(pred) with Store[K, V] { - override def put(kv: (K, Option[V])) = store.put(kv) - override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = store.multiPut(kvs) + + private val padded_backoffs = backoffs ++ Seq(0.second) + + private def find[T](futures: Iterator[(Future[T], Duration)])(pred: T => Boolean): Future[T] = { + if (!futures.hasNext) { + Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find")) + } else { + val (next, delay) = futures.next() + if (!futures.hasNext) { + next + } else { + next.filter(pred).rescue { + case e: Exception => + timer.doLater(delay)(()) flatMap { _ => + find(futures)(pred) + } + } + } + } + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { + store.multiPut(kvs) map { case (k, future) => + val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))}) + .zip(padded_backoffs.iterator) + (k, find(retryStream) { t => true }) + } + } + + override def put(kv: (K, Option[V])) = multiPut(Map(kv)).apply(kv._1) } +