From 2543bd6edd10521eb40a5bbf8958e1840fa0b1c0 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Fri, 13 Dec 2013 17:32:04 -0800 Subject: [PATCH 1/2] Adds the CalendarTimeStrategy --- project/Build.scala | 4 +- .../query/CalendarTimeQueryRange.scala | 85 +++++++++++++++++++ .../CalendarTimeQueryRangeProperties.scala | 55 ++++++++++++ 3 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala create mode 100644 storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala diff --git a/project/Build.scala b/project/Build.scala index d7d53a28..9b1611ec 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -118,6 +118,7 @@ object StorehausBuild extends Build { val algebirdVersion = "0.3.1" val bijectionVersion = "0.6.0" val utilVersion = "6.3.7" + val scaldingVersion = "0.9.0rc4" lazy val storehaus = Project( id = "storehaus", @@ -160,7 +161,8 @@ object StorehausBuild extends Build { lazy val storehausAlgebra = module("algebra").settings( libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion, libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion, - libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion + libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion, + libraryDependencies += "com.twitter" %% "scalding-date" % scaldingVersion ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausMemcache = module("memcache").settings( diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala new file mode 100644 index 00000000..52c20666 --- /dev/null +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.algebra.query + +import com.twitter.scalding.{RichDate, DateRange, Duration, AbsoluteDuration, Years, Months, Weeks, Days, Hours, Minutes} +import java.util.TimeZone +import scala.annotation.tailrec + +case class Bucket(typeIndx: Int, startTime: Long) { + override def toString = Buckets.durationToName(Buckets.indexToDuration(typeIndx)) + " starting at : " + RichDate(startTime) + "Indx is : " + typeIndx + def length: Long = { + val endTime = Buckets.indexToDuration(typeIndx).addTo(RichDate(startTime)).timestamp + endTime - startTime + } +} +object Buckets { + private val allBuckets = { + implicit val tz = TimeZone.getTimeZone("UTC") + List(Years(1), Months(1), Days(1), Hours(1), Minutes(1)) + } + + def indexToDuration(indx: Int) = allBuckets(indx) + + def durationToName(x: Duration): String = x.getClass.getName.split('.').reverse.head + + def get = allBuckets + // def getDuration(bucket: Bucket) = allBuckets(bucket.indx) + // def getName(bucket: Bucket) = durationToName(allBuckets(bucket.indx)) + def tsToBuckets(msSinceEpoch: Long) = { + val richDate = RichDate(msSinceEpoch) + allBuckets.zipWithIndex.map { case (duration, indx) => + Bucket(indx, duration.floorOf(richDate).timestamp) + }.toSet + } +} + +/** A query strategy for time with named buckets. + */ +class CalendarTimeStrategy extends QueryStrategy[DateRange, Long, Bucket] { + private def len(dr: DateRange) = AbsoluteDuration.fromMillisecs(dr.end.timestamp - dr.start.timestamp + 1L) + + private def outsideRange(filterDR: DateRange, child: DateRange) = + (child.start >= filterDR.end || child.end <= filterDR.start) + + def query(dr: DateRange): Set[Bucket] = extract(dr, Set(dr), 0, Buckets.get, Set[Bucket]()) + + @tailrec + private def extract(filterDr: DateRange, drSet: Set[DateRange], curIndx: Int, remainingDurations: List[Duration], acc: Set[Bucket]): Set[Bucket] = { + remainingDurations match { + case Nil => acc + case head :: tail => + // expand the DR + val expandedOut = drSet.map{ dr => + DateRange(head.floorOf(dr.start), head.floorOf(dr.end) + head) + .each(head) + .filter(!outsideRange(filterDr, _)) + .filter(len(_).toMillisecs > 1L) + .toSet + }.foldLeft(Set[DateRange]()){_ ++ _} + // Things which only partially fit in this time range + val others = expandedOut.filter(!filterDr.contains(_)) + // Things which fit fully in this time range + val fullyInRange = expandedOut + .filter(filterDr.contains(_)) + .map(x => Bucket(curIndx, x.start.timestamp)) + extract(filterDr, others, curIndx + 1, tail, acc ++ fullyInRange) + } + } + + def index(ts: Long) = Buckets.tsToBuckets(ts) +} diff --git a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala new file mode 100644 index 00000000..e1ea2d52 --- /dev/null +++ b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.algebra.query + + +import org.scalacheck.{ Gen, Arbitrary, Properties } +import org.scalacheck.Prop._ +import org.scalacheck.Properties +import com.twitter.scalding._ +import scala.collection.breakOut + +object CalendarTimeQueryRangeProperties extends Properties("CalendarTimeStrategy") { + def exclusiveUpperLen(dr: DateRange) = AbsoluteDuration.fromMillisecs(dr.end.timestamp - dr.start.timestamp ) + + def floor(ts:Long, floorSize: Long = 1000L * 60) = (ts / floorSize) * floorSize + + val TOP_DATE_SPAN = 1000L * 3600 * 24 * 365 * 80 + implicit val arbDR:Arbitrary[DateRange] = Arbitrary(for { + start <- Gen.choose(0, TOP_DATE_SPAN) + endDelta <- Gen.choose(0, TOP_DATE_SPAN/2) + } yield { + val startTS = floor(start) + val endTS = startTS + floor(endDelta) + DateRange(RichDate(startTS), RichDate(endTS)) + }) + + property("Index timerange must have an entry in the query") = forAll { dr: DateRange => + val strategy = new CalendarTimeStrategy + val buckets = strategy.query(dr) + val queryBuckets = strategy.index(dr.start.timestamp) + (queryBuckets & buckets).size == 1 + } + + property("Time QueryRange Adds back up tot the expected total") = forAll { dr: DateRange => + val strategy = new CalendarTimeStrategy + val timeLen = exclusiveUpperLen(dr).toMillisecs + val buckets = strategy.query(dr) + val reSummed = buckets.toList.map(_.length).foldLeft(0L){_ + _} + reSummed == timeLen + } +} From 07a76c58c32dee7c9e88985154acf02f8a4720a7 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Sat, 14 Dec 2013 22:50:29 -0800 Subject: [PATCH 2/2] Review comments --- .../query/CalendarTimeQueryRange.scala | 43 ++++++++----------- .../CalendarTimeQueryRangeProperties.scala | 2 +- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala index 52c20666..20ebcae8 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRange.scala @@ -20,46 +20,41 @@ import com.twitter.scalding.{RichDate, DateRange, Duration, AbsoluteDuration, Ye import java.util.TimeZone import scala.annotation.tailrec -case class Bucket(typeIndx: Int, startTime: Long) { - override def toString = Buckets.durationToName(Buckets.indexToDuration(typeIndx)) + " starting at : " + RichDate(startTime) + "Indx is : " + typeIndx - def length: Long = { - val endTime = Buckets.indexToDuration(typeIndx).addTo(RichDate(startTime)).timestamp - endTime - startTime - } -} -object Buckets { +case class CalendarBucket(typeIndx: Int, startTime: Long) + +/** A query strategy for time with named buckets. + */ +class CalendarTimeStrategy(strategyTimezone: TimeZone = TimeZone.getTimeZone("UTC")) extends QueryStrategy[DateRange, Long, CalendarBucket] { + private val allBuckets = { - implicit val tz = TimeZone.getTimeZone("UTC") + implicit val tz = strategyTimezone List(Years(1), Months(1), Days(1), Hours(1), Minutes(1)) } - def indexToDuration(indx: Int) = allBuckets(indx) + def bucketLength(bucket: CalendarBucket) = { + val endTime = indexToDuration(bucket.typeIndx).addTo(RichDate(bucket.startTime)).timestamp + endTime - bucket.startTime + } - def durationToName(x: Duration): String = x.getClass.getName.split('.').reverse.head + protected def indexToDuration(indx: Int) = allBuckets(indx) + protected def durationToName(x: Duration): String = x.getClass.getName.split('.').reverse.head - def get = allBuckets - // def getDuration(bucket: Bucket) = allBuckets(bucket.indx) - // def getName(bucket: Bucket) = durationToName(allBuckets(bucket.indx)) - def tsToBuckets(msSinceEpoch: Long) = { + private def tsToBuckets(msSinceEpoch: Long) = { val richDate = RichDate(msSinceEpoch) allBuckets.zipWithIndex.map { case (duration, indx) => - Bucket(indx, duration.floorOf(richDate).timestamp) + CalendarBucket(indx, duration.floorOf(richDate).timestamp) }.toSet } -} -/** A query strategy for time with named buckets. - */ -class CalendarTimeStrategy extends QueryStrategy[DateRange, Long, Bucket] { private def len(dr: DateRange) = AbsoluteDuration.fromMillisecs(dr.end.timestamp - dr.start.timestamp + 1L) private def outsideRange(filterDR: DateRange, child: DateRange) = (child.start >= filterDR.end || child.end <= filterDR.start) - def query(dr: DateRange): Set[Bucket] = extract(dr, Set(dr), 0, Buckets.get, Set[Bucket]()) + def query(dr: DateRange): Set[CalendarBucket] = extract(dr, Set(dr), 0, allBuckets, Set[CalendarBucket]()) @tailrec - private def extract(filterDr: DateRange, drSet: Set[DateRange], curIndx: Int, remainingDurations: List[Duration], acc: Set[Bucket]): Set[Bucket] = { + private def extract(filterDr: DateRange, drSet: Set[DateRange], curIndx: Int, remainingDurations: List[Duration], acc: Set[CalendarBucket]): Set[CalendarBucket] = { remainingDurations match { case Nil => acc case head :: tail => @@ -76,10 +71,10 @@ class CalendarTimeStrategy extends QueryStrategy[DateRange, Long, Bucket] { // Things which fit fully in this time range val fullyInRange = expandedOut .filter(filterDr.contains(_)) - .map(x => Bucket(curIndx, x.start.timestamp)) + .map(x => CalendarBucket(curIndx, x.start.timestamp)) extract(filterDr, others, curIndx + 1, tail, acc ++ fullyInRange) } } - def index(ts: Long) = Buckets.tsToBuckets(ts) + def index(ts: Long):Set[CalendarBucket] = tsToBuckets(ts) } diff --git a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala index e1ea2d52..a331aeed 100644 --- a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala +++ b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/query/CalendarTimeQueryRangeProperties.scala @@ -49,7 +49,7 @@ object CalendarTimeQueryRangeProperties extends Properties("CalendarTimeStrategy val strategy = new CalendarTimeStrategy val timeLen = exclusiveUpperLen(dr).toMillisecs val buckets = strategy.query(dr) - val reSummed = buckets.toList.map(_.length).foldLeft(0L){_ + _} + val reSummed = buckets.toList.map(strategy.bucketLength(_)).foldLeft(0L){_ + _} reSummed == timeLen } }