Skip to content

Commit

Permalink
Merge pull request #195 from twitter/feature/calendarQueryStrategy2
Browse files Browse the repository at this point in the history
Adds the CalendarTimeStrategy
  • Loading branch information
johnynek committed Dec 16, 2013
2 parents 05d36f1 + 07a76c5 commit a4edba9
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 1 deletion.
4 changes: 3 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object StorehausBuild extends Build {
val algebirdVersion = "0.3.1"
val bijectionVersion = "0.6.0"
val utilVersion = "6.3.7"
val scaldingVersion = "0.9.0rc4"

lazy val storehaus = Project(
id = "storehaus",
Expand Down Expand Up @@ -162,7 +163,8 @@ object StorehausBuild extends Build {
lazy val storehausAlgebra = module("algebra").settings(
libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion,
libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion,
libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion
libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion,
libraryDependencies += "com.twitter" %% "scalding-date" % scaldingVersion
).dependsOn(storehausCore % "test->test;compile->compile")

lazy val storehausMemcache = module("memcache").settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra.query

import com.twitter.scalding.{RichDate, DateRange, Duration, AbsoluteDuration, Years, Months, Weeks, Days, Hours, Minutes}
import java.util.TimeZone
import scala.annotation.tailrec

case class CalendarBucket(typeIndx: Int, startTime: Long)

/** A query strategy for time with named buckets.
*/
class CalendarTimeStrategy(strategyTimezone: TimeZone = TimeZone.getTimeZone("UTC")) extends QueryStrategy[DateRange, Long, CalendarBucket] {

private val allBuckets = {
implicit val tz = strategyTimezone
List(Years(1), Months(1), Days(1), Hours(1), Minutes(1))
}

def bucketLength(bucket: CalendarBucket) = {
val endTime = indexToDuration(bucket.typeIndx).addTo(RichDate(bucket.startTime)).timestamp
endTime - bucket.startTime
}

protected def indexToDuration(indx: Int) = allBuckets(indx)
protected def durationToName(x: Duration): String = x.getClass.getName.split('.').reverse.head

private def tsToBuckets(msSinceEpoch: Long) = {
val richDate = RichDate(msSinceEpoch)
allBuckets.zipWithIndex.map { case (duration, indx) =>
CalendarBucket(indx, duration.floorOf(richDate).timestamp)
}.toSet
}

private def len(dr: DateRange) = AbsoluteDuration.fromMillisecs(dr.end.timestamp - dr.start.timestamp + 1L)

private def outsideRange(filterDR: DateRange, child: DateRange) =
(child.start >= filterDR.end || child.end <= filterDR.start)

def query(dr: DateRange): Set[CalendarBucket] = extract(dr, Set(dr), 0, allBuckets, Set[CalendarBucket]())

@tailrec
private def extract(filterDr: DateRange, drSet: Set[DateRange], curIndx: Int, remainingDurations: List[Duration], acc: Set[CalendarBucket]): Set[CalendarBucket] = {
remainingDurations match {
case Nil => acc
case head :: tail =>
// expand the DR
val expandedOut = drSet.map{ dr =>
DateRange(head.floorOf(dr.start), head.floorOf(dr.end) + head)
.each(head)
.filter(!outsideRange(filterDr, _))
.filter(len(_).toMillisecs > 1L)
.toSet
}.foldLeft(Set[DateRange]()){_ ++ _}
// Things which only partially fit in this time range
val others = expandedOut.filter(!filterDr.contains(_))
// Things which fit fully in this time range
val fullyInRange = expandedOut
.filter(filterDr.contains(_))
.map(x => CalendarBucket(curIndx, x.start.timestamp))
extract(filterDr, others, curIndx + 1, tail, acc ++ fullyInRange)
}
}

def index(ts: Long):Set[CalendarBucket] = tsToBuckets(ts)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra.query


import org.scalacheck.{ Gen, Arbitrary, Properties }
import org.scalacheck.Prop._
import org.scalacheck.Properties
import com.twitter.scalding._
import scala.collection.breakOut

object CalendarTimeQueryRangeProperties extends Properties("CalendarTimeStrategy") {
def exclusiveUpperLen(dr: DateRange) = AbsoluteDuration.fromMillisecs(dr.end.timestamp - dr.start.timestamp )

def floor(ts:Long, floorSize: Long = 1000L * 60) = (ts / floorSize) * floorSize

val TOP_DATE_SPAN = 1000L * 3600 * 24 * 365 * 80
implicit val arbDR:Arbitrary[DateRange] = Arbitrary(for {
start <- Gen.choose(0, TOP_DATE_SPAN)
endDelta <- Gen.choose(0, TOP_DATE_SPAN/2)
} yield {
val startTS = floor(start)
val endTS = startTS + floor(endDelta)
DateRange(RichDate(startTS), RichDate(endTS))
})

property("Index timerange must have an entry in the query") = forAll { dr: DateRange =>
val strategy = new CalendarTimeStrategy
val buckets = strategy.query(dr)
val queryBuckets = strategy.index(dr.start.timestamp)
(queryBuckets & buckets).size == 1
}

property("Time QueryRange Adds back up tot the expected total") = forAll { dr: DateRange =>
val strategy = new CalendarTimeStrategy
val timeLen = exclusiveUpperLen(dr).toMillisecs
val buckets = strategy.query(dr)
val reSummed = buckets.toList.map(strategy.bucketLength(_)).foldLeft(0L){_ + _}
reSummed == timeLen
}
}

0 comments on commit a4edba9

Please sign in to comment.