From 7850ad3dae498ad6d46a8fbbb2ce9ff71ff6cfd0 Mon Sep 17 00:00:00 2001 From: Ashutosh Singhal Date: Fri, 8 Feb 2013 14:44:53 -0800 Subject: [PATCH 1/6] add logic for replicating writes and reads to stores --- .../storehaus/algebra/StoreAlgebra.scala | 2 +- .../scala/com/twitter/storehaus/Store.scala | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala index 8a17c2e1..82e1928a 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala @@ -65,7 +65,7 @@ class AlgebraicStore[StoreType <: Store[StoreType, K, V], K, V](store: StoreType * Store which aggregates values added with + into the existing value in the store. * If addition ever results in a zero value, the key is deleted from the store. */ -class AggregatingStore[StoreType <: Store[StoreType, K, V], K, V: Monoid](store: StoreType) +class AggregatingStore[StoreType <: Store[StoreType, K, V], K, V: Monoid](store: Store[StoreType, K, V]) extends Store[AggregatingStore[StoreType, K, V], K, V] { override def get(k: K) = store.get(k) override def multiGet(ks: Set[K]) = store.multiGet(ks) diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala index 19deefd4..50758360 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala @@ -72,16 +72,31 @@ object Store { } } -trait Store[Self <: Store[Self,K,V], K, V] extends ReadableStore[K, V] { - def -(k: K): Future[Self] - def +(pair: (K,V)): Future[Self] - def update(k: K)(fn: Option[V] => Option[V]): Future[Self] = { +trait Store[Self <: Store[Self,K,V], K, V] extends ReadableStore[K, V] { self => + def -(k: K): Future[Store[Self,K,V]] + def +(pair: (K,V)): Future[Store[Self,K,V]] + def update(k: K)(fn: Option[V] => Option[V]): Future[Store[Self,K,V]] = { get(k) flatMap { opt: Option[V] => fn(opt) .map { v => this + (k -> v) } .getOrElse(this - k) } } + + /** + * Replicates writes to both stores, and uses the "or" logic defined on ReadableStore to replicate reads. + */ + def combinedWith(other: Store[Self, K, V]): Store[Self, K, V] = { + new Store[Self, K, V] { combined => + lazy val readableStore = this.or(other) + override def get(k: K) = readableStore.get(k) + override def multiGet(ks: Set[K]) = readableStore.multiGet(ks) + override def update(k: K)(fn: Option[V] => Option[V]) = + self.update(k)(fn).join(other.update(k)(fn)).map { _ => combined } + override def -(k: K) = (self - k).join(other - k).map { _ => combined} + override def +(pair: (K,V)) = (self + pair).join(other + pair).map { _ => combined} + } + } } object KeysetStore { From 5b6839ce8dc8e673223ccfd789ecca9fa3779618 Mon Sep 17 00:00:00 2001 From: Ashutosh Singhal Date: Fri, 8 Feb 2013 15:00:50 -0800 Subject: [PATCH 2/6] move to replicate fn --- .../scala/com/twitter/storehaus/Store.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala index 50758360..b4b27c30 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala @@ -70,6 +70,22 @@ object Store { } }) } + + /** + * Replicates writes to all stores, and takes the first successful read. + */ + def replicate[Self <: Store[Self, K, V], K, V](stores: Seq[Store[Self, K, V]]): Store[Self, K, V] = { + import Store.{selectFirstSuccessfulTrial => selectFirst} + new Store[Self, K, V] { combined => + override def get(k: K) = selectFirst(stores.map { _.get(k) }) + override def multiGet(ks: Set[K]) = selectFirst(stores.map { _.multiGet(ks) }) + override def update(k: K)(fn: Option[V] => Option[V]) = + Future.collect(stores.map { _.update(k)(fn) }).map { _ => combined } + override def -(k: K) = Future.collect(stores.map { _ - k }).map { _ => combined } + override def +(pair: (K,V)) = Future.collect(stores.map { _ + pair }).map { _ => combined } + } + } + } trait Store[Self <: Store[Self,K,V], K, V] extends ReadableStore[K, V] { self => @@ -82,21 +98,6 @@ trait Store[Self <: Store[Self,K,V], K, V] extends ReadableStore[K, V] { self => .getOrElse(this - k) } } - - /** - * Replicates writes to both stores, and uses the "or" logic defined on ReadableStore to replicate reads. - */ - def combinedWith(other: Store[Self, K, V]): Store[Self, K, V] = { - new Store[Self, K, V] { combined => - lazy val readableStore = this.or(other) - override def get(k: K) = readableStore.get(k) - override def multiGet(ks: Set[K]) = readableStore.multiGet(ks) - override def update(k: K)(fn: Option[V] => Option[V]) = - self.update(k)(fn).join(other.update(k)(fn)).map { _ => combined } - override def -(k: K) = (self - k).join(other - k).map { _ => combined} - override def +(pair: (K,V)) = (self + pair).join(other + pair).map { _ => combined} - } - } } object KeysetStore { From 5b2b92a3fcc4b7ccc0599687d628cd887d54fe49 Mon Sep 17 00:00:00 2001 From: Ashutosh Singhal Date: Fri, 8 Feb 2013 15:02:15 -0800 Subject: [PATCH 3/6] remove useless self --- storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala index b4b27c30..f77142c2 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala @@ -88,7 +88,7 @@ object Store { } -trait Store[Self <: Store[Self,K,V], K, V] extends ReadableStore[K, V] { self => +trait Store[Self <: Store[Self,K,V], K, V] extends ReadableStore[K, V] { def -(k: K): Future[Store[Self,K,V]] def +(pair: (K,V)): Future[Store[Self,K,V]] def update(k: K)(fn: Option[V] => Option[V]): Future[Store[Self,K,V]] = { From cb8e4cd95c790da8b2cab15c1ff3b97efe7f5eb4 Mon Sep 17 00:00:00 2001 From: Ashutosh Singhal Date: Fri, 8 Feb 2013 15:29:26 -0800 Subject: [PATCH 4/6] switched to ReplicatedStore --- .../storehaus/algebra/StoreAlgebra.scala | 2 +- .../twitter/storehaus/ReplicatedStore.scala | 20 +++++++++++++++++ .../scala/com/twitter/storehaus/Store.scala | 22 +++---------------- 3 files changed, 24 insertions(+), 20 deletions(-) create mode 100644 storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala index 82e1928a..8a17c2e1 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/StoreAlgebra.scala @@ -65,7 +65,7 @@ class AlgebraicStore[StoreType <: Store[StoreType, K, V], K, V](store: StoreType * Store which aggregates values added with + into the existing value in the store. * If addition ever results in a zero value, the key is deleted from the store. */ -class AggregatingStore[StoreType <: Store[StoreType, K, V], K, V: Monoid](store: Store[StoreType, K, V]) +class AggregatingStore[StoreType <: Store[StoreType, K, V], K, V: Monoid](store: StoreType) extends Store[AggregatingStore[StoreType, K, V], K, V] { override def get(k: K) = store.get(k) override def multiGet(ks: Set[K]) = store.multiGet(ks) diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala new file mode 100644 index 00000000..ff723169 --- /dev/null +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala @@ -0,0 +1,20 @@ +package com.twitter.storehaus + +import com.twitter.util.Future + +import Store.{selectFirstSuccessfulTrial => selectFirst} + +/** + * Replicates writes to all stores, and takes the first successful read. + */ +class ReplicatedStore[StoreType <: Store[StoreType, K, V], K, V](stores: Seq[StoreType]) + extends Store[ReplicatedStore[StoreType, K, V], K, V] { + override def get(k: K) = selectFirst(stores.map { _.get(k) }) + override def multiGet(ks: Set[K]) = selectFirst(stores.map { _.multiGet(ks) }) + override def update(k: K)(fn: Option[V] => Option[V]) = + Future.collect(stores.map { _.update(k)(fn) }).map { new ReplicatedStore(_) } + override def -(k: K) = + Future.collect(stores.map { _ - k }).map { new ReplicatedStore(_) } + override def +(pair: (K,V)) = + Future.collect(stores.map { _ + pair }).map { new ReplicatedStore(_) } +} \ No newline at end of file diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala index f77142c2..19deefd4 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/Store.scala @@ -70,28 +70,12 @@ object Store { } }) } - - /** - * Replicates writes to all stores, and takes the first successful read. - */ - def replicate[Self <: Store[Self, K, V], K, V](stores: Seq[Store[Self, K, V]]): Store[Self, K, V] = { - import Store.{selectFirstSuccessfulTrial => selectFirst} - new Store[Self, K, V] { combined => - override def get(k: K) = selectFirst(stores.map { _.get(k) }) - override def multiGet(ks: Set[K]) = selectFirst(stores.map { _.multiGet(ks) }) - override def update(k: K)(fn: Option[V] => Option[V]) = - Future.collect(stores.map { _.update(k)(fn) }).map { _ => combined } - override def -(k: K) = Future.collect(stores.map { _ - k }).map { _ => combined } - override def +(pair: (K,V)) = Future.collect(stores.map { _ + pair }).map { _ => combined } - } - } - } trait Store[Self <: Store[Self,K,V], K, V] extends ReadableStore[K, V] { - def -(k: K): Future[Store[Self,K,V]] - def +(pair: (K,V)): Future[Store[Self,K,V]] - def update(k: K)(fn: Option[V] => Option[V]): Future[Store[Self,K,V]] = { + def -(k: K): Future[Self] + def +(pair: (K,V)): Future[Self] + def update(k: K)(fn: Option[V] => Option[V]): Future[Self] = { get(k) flatMap { opt: Option[V] => fn(opt) .map { v => this + (k -> v) } From 186852b091ac5d237c263d9feeb303ab088343b2 Mon Sep 17 00:00:00 2001 From: Ashutosh Singhal Date: Fri, 8 Feb 2013 15:40:48 -0800 Subject: [PATCH 5/6] added ReplicatedStoreProperties --- .../twitter/storehaus/ReplicatedStoreProperties.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala new file mode 100644 index 00000000..a25a03f3 --- /dev/null +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala @@ -0,0 +1,11 @@ +package com.twitter.storehaus + +import org.scalacheck.Properties + +object ReplicatedStoreProperties extends Properties("ReplicatedStore") { + import StoreProperties.storeTest + + property("ReplicatedStore test") = + storeTest[ReplicatedStore[MapStore[String, Int], String, Int], String, Int](new ReplicatedStore( + Stream.continually(new MapStore[String, Int]()).take(100).toSeq)) +} \ No newline at end of file From 621962a24e7b0c82e2e10902128edd9e282ffa07 Mon Sep 17 00:00:00 2001 From: Ashutosh Singhal Date: Fri, 8 Feb 2013 15:42:39 -0800 Subject: [PATCH 6/6] added license info --- .../com/twitter/storehaus/ReplicatedStore.scala | 16 ++++++++++++++++ .../storehaus/ReplicatedStoreProperties.scala | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala index ff723169..38647b61 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/ReplicatedStore.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.twitter.storehaus import com.twitter.util.Future diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala index a25a03f3..44f8dec4 100644 --- a/storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/ReplicatedStoreProperties.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.twitter.storehaus import org.scalacheck.Properties