diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/QTreeMicroBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/QTreeMicroBenchmark.scala index 1a7dc9a99..25b968cf3 100644 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/QTreeMicroBenchmark.scala +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/QTreeMicroBenchmark.scala @@ -40,7 +40,7 @@ object QTreeMicroBenchmark { val r = if (q.offset % 2 == 0) null else q val parent = - new QTree[A](nextOffset, nextLevel, q.count, monoid.zero, l, r) + new QTree[A](monoid.zero, nextOffset, nextLevel, q.count, l, r) extendToLevelDoubleBranch(parent, n) } @@ -56,9 +56,9 @@ object QTreeMicroBenchmark { val parent = if (q.offset % 2 == 0) - new QTree[A](nextOffset, nextLevel, q.count, monoid.zero, q, null) + new QTree[A](monoid.zero, nextOffset, nextLevel, q.count, q, null) else - new QTree[A](nextOffset, nextLevel, q.count, monoid.zero, null, q) + new QTree[A](monoid.zero, nextOffset, nextLevel, q.count, null, q) extendToLevelSingleBranch(parent, n) } diff --git a/algebird-core/src/main/scala/com/twitter/algebird/QTree.scala b/algebird-core/src/main/scala/com/twitter/algebird/QTree.scala index af35833b0..ec927ba26 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/QTree.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/QTree.scala @@ -42,21 +42,39 @@ object QTree { /** * level gives a bin size of 2^level. By default the bin size is 1/65536 (level = -16) */ - def apply[A](kv: (Double, A), level: Int = DefaultLevel): QTree[A] = - QTree(math.floor(kv._1 / math.pow(2.0, level)).toLong, + def apply[A](kv: (Double, A), level: Int = DefaultLevel): QTree[A] = { + val offset = math.floor(kv._1 / math.pow(2.0, level)).toLong + require(offset >= 0, "QTree can not accept negative values") + + new QTree(kv._2, + offset, level, 1, - kv._2, - None, - None) + null, + null) + } + + def apply[A](kv: (Long, A)): QTree[A] = { + require(kv._1 >= 0, "QTree can not accept negative values") - def apply[A](kv: (Long, A)): QTree[A] = - QTree(kv._1, + new QTree(kv._2, + kv._1, 0, 1, - kv._2, - None, - None) + null, + null) + } + + def apply[A](offset: Long, + level: Int, + count: Long, + sum: A, //the sum at just this node (*not* including its children) + lowerChild: Option[QTree[A]], + upperChild: Option[QTree[A]]): QTree[A] = { + require(offset >= 0, "QTree can not accept negative values") + + new QTree(sum, offset, level, count, lowerChild.orNull, upperChild.orNull) + } /** * The common case of wanting an offset and sum for the same value @@ -73,6 +91,12 @@ object QTree { */ def apply(k: Double): QTree[Double] = apply(k -> k) + /** + * End user consumable unapply for QTree + */ + def unapply[A](qtree: QTree[A]): Option[(Long, Int, Long, A, Option[QTree[A]], Option[QTree[A]])] = + Some((qtree.offset, qtree.level, qtree.count, qtree.sum, qtree.lowerChild, qtree.upperChild)) + /** * If you are sure you only care about the approximate histogram * features of QTree, you can save some space by using QTree[Unit] @@ -84,11 +108,35 @@ object QTree { * level gives a bin size of 2^level. By default this is 1/65536 (level = -16) */ def value(v: Double, level: Int = DefaultLevel): QTree[Unit] = apply(v -> (), level) + + private[algebird] def mergePeers[@specialized(Int, Long, Float, Double) A](left: QTree[A], right: QTree[A])(implicit monoid: Monoid[A]): QTree[A] = { + assert(right.lowerBound == left.lowerBound, "lowerBound " + right.lowerBound + " != " + left.lowerBound) + assert(right.level == left.level, "level " + right.level + " != " + left.level) + + new QTree[A](monoid.plus(left.sum, right.sum), + left.offset, + left.level, left.count + right.count, + mergeOptions(left.lowerChildNullable, right.lowerChildNullable), + mergeOptions(left.upperChildNullable, right.upperChildNullable)) + } + + private def mergeOptions[A](aNullable: QTree[A], bNullable: QTree[A])(implicit monoid: Monoid[A]): QTree[A] = + if (aNullable != null) { + if (bNullable != null) { + mergePeers(aNullable, bNullable) + } else aNullable + } else bNullable + + private[algebird] val cachedRangeCacheSize: Int = 20 + private[algebird] val cachedRangeLowerBound: Int = cachedRangeCacheSize * -1 + private[algebird] val rangeLut: Array[Double] = (cachedRangeLowerBound until cachedRangeCacheSize).map { level => + math.pow(2.0, level) + }.toArray[Double] } class QTreeSemigroup[A](k: Int)(implicit val underlyingMonoid: Monoid[A]) extends Semigroup[QTree[A]] { /** Override this if you want to change how frequently sumOption calls compress */ - def compressBatchSize: Int = 25 + def compressBatchSize: Int = 50 def plus(left: QTree[A], right: QTree[A]) = left.merge(right).compress(k) override def sumOption(items: TraversableOnce[QTree[A]]): Option[QTree[A]] = if (items.isEmpty) None else { @@ -108,32 +156,72 @@ class QTreeSemigroup[A](k: Int)(implicit val underlyingMonoid: Monoid[A]) extend } } -case class QTree[A]( - offset: Long, //the range this tree covers is offset*(2^level) ... (offset+1)*(2^level) - level: Int, - count: Long, //the total count for this node and all of its children - sum: A, //the sum at just this node (*not* including its children) - lowerChild: Option[QTree[A]], - upperChild: Option[QTree[A]]) { +class QTree[@specialized(Int, Long, Float, Double) A] private[algebird] ( + _sum: A, //the sum at just this node (*not* including its children) + _offset: Long, //the range this tree covers is offset*(2^level) ... (offset+1)*(2^level) + _level: Int, + _count: Long, //the total count for this node and all of its children + _lowerChildNullable: QTree[A], + _upperChildNullable: QTree[A]) + extends scala.Product6[Long, Int, Long, A, Option[QTree[A]], Option[QTree[A]]] with Serializable { + import QTree._ + + val range: Double = + if (_level < cachedRangeCacheSize && level > cachedRangeLowerBound) + rangeLut(_level + cachedRangeCacheSize) + else + math.pow(2.0, level) + + def lowerBound: Double = range * _offset + def upperBound: Double = range * (_offset + 1) + + def lowerChild: Option[QTree[A]] = Option(_lowerChildNullable) + def upperChild: Option[QTree[A]] = Option(_upperChildNullable) + + def this(offset: Long, level: Int, count: Long, sum: A, lowerChild: Option[QTree[A]], upperChild: Option[QTree[A]]) = + this(sum, offset, level, count, lowerChild.orNull, upperChild.orNull) - require(offset >= 0, "QTree can not accept negative values") + // Helpers to access the nullable ones from inside the QTree work + @inline private[algebird] def lowerChildNullable: QTree[A] = _lowerChildNullable + @inline private[algebird] def upperChildNullable: QTree[A] = _upperChildNullable - def range: Double = math.pow(2.0, level) - def lowerBound: Double = range * offset - def upperBound: Double = range * (offset + 1) + @inline def offset: Long = _offset + @inline def level: Int = _level + @inline def count: Long = _count + @inline def sum: A = _sum - private def extendToLevel(n: Int)(implicit monoid: Monoid[A]): QTree[A] = { + @inline def _1: Long = _offset + @inline def _2: Int = _level + @inline def _3: Long = _count + @inline def _4: A = _sum + @inline def _5: Option[QTree[A]] = lowerChild + @inline def _6: Option[QTree[A]] = upperChild + + override lazy val hashCode: Int = _root_.scala.runtime.ScalaRunTime._hashCode(this) + + override def toString: String = _root_.scala.runtime.ScalaRunTime._toString(this) + + override def equals(other: Any): Boolean = _root_.scala.runtime.ScalaRunTime._equals(this, other) + + override def canEqual(other: Any): Boolean = other.isInstanceOf[QTree[A]] + + override def productArity: Int = 6 + + @annotation.tailrec + private[algebird] final def extendToLevel(n: Int)(implicit monoid: Monoid[A]): QTree[A] = { if (n <= level) this else { - val nextLevel = level + 1 - val nextOffset = offset / 2 + val nextLevel = _level + 1 + val nextOffset = _offset / 2 + + // See benchmark in QTreeMicroBenchmark for why do this rather than the single if + // with 2 calls to QTree[A] in it. + val l = if (offset % 2 == 0) this else null + val r = if (offset % 2 == 0) null else this val parent = - if (offset % 2 == 0) - QTree[A](nextOffset, nextLevel, count, monoid.zero, Some(this), None) - else - QTree[A](nextOffset, nextLevel, count, monoid.zero, None, Some(this)) + new QTree[A](monoid.zero, nextOffset, nextLevel, _count, l, r) parent.extendToLevel(n) } @@ -146,8 +234,8 @@ case class QTree[A]( * level (that is, the power of 2 for the interval). */ private def commonAncestorLevel(other: QTree[A]) = { - val minLevel = level.min(other.level) - val leftOffset = offset << (level - minLevel) + val minLevel = _level.min(other.level) + val leftOffset = offset << (_level - minLevel) val rightOffset = other.offset << (other.level - minLevel) var offsetDiff = leftOffset ^ rightOffset var ancestorLevel = minLevel @@ -155,7 +243,7 @@ case class QTree[A]( ancestorLevel += 1 offsetDiff >>= 1 } - ancestorLevel.max(level).max(other.level) + ancestorLevel.max(_level).max(other.level) } /** @@ -169,26 +257,9 @@ case class QTree[A]( val commonAncestor = commonAncestorLevel(other) val left = extendToLevel(commonAncestor) val right = other.extendToLevel(commonAncestor) - left.mergeWithPeer(right) - } - - private def mergeWithPeer(other: QTree[A])(implicit monoid: Monoid[A]): QTree[A] = { - assert(other.lowerBound == lowerBound, "lowerBound " + other.lowerBound + " != " + lowerBound) - assert(other.level == level, "level " + other.level + " != " + level) - - copy(count = count + other.count, - sum = monoid.plus(sum, other.sum), - lowerChild = mergeOptions(lowerChild, other.lowerChild), - upperChild = mergeOptions(upperChild, other.upperChild)) + mergePeers(left, right) } - private def mergeOptions(a: Option[QTree[A]], b: Option[QTree[A]])(implicit monoid: Monoid[A]): Option[QTree[A]] = - (a, b) match { - case (Some(qa), Some(qb)) => Some(qa.mergeWithPeer(qb)) - case (None, right) => right - case (left, None) => left - } - /** * give lower and upper bounds respectively of the percentile * value given. For instance, quantileBounds(0.5) would give @@ -197,36 +268,49 @@ case class QTree[A]( def quantileBounds(p: Double): (Double, Double) = { require(p >= 0.0 && p <= 1.0, "The given percentile must be of the form 0 <= p <= 1.0") - val rank = math.floor(count * p).toLong + val rank = math.floor(_count * p).toLong // get is safe below, because findRankLowerBound only returns // None if rank > count, but due to construction rank <= count - (findRankLowerBound(rank).get, findRankUpperBound(rank).get) + (findRankLowerBound(rank), findRankUpperBound(rank)) } - private def findRankLowerBound(rank: Long): Option[Double] = - if (rank > count) - None + private def findRankLowerBound(rank: Long): java.lang.Double = + if (rank > _count) + null else { val childCounts = mapChildrenWithDefault(0L)(_.count) - val parentCount = count - childCounts._1 - childCounts._2 - lowerChild.flatMap { _.findRankLowerBound(rank - parentCount) } - .orElse { - val newRank = rank - childCounts._1 - parentCount - if (newRank <= 0) - Some(lowerBound) - else - upperChild.flatMap{ _.findRankLowerBound(newRank) } - } + val parentCount = _count - childCounts._1 - childCounts._2 + val r2 = if (lowerChildNullable != null) lowerChildNullable.findRankLowerBound(rank - parentCount) else null + + if (r2 == null) { + val newRank = rank - childCounts._1 - parentCount + if (newRank <= 0) + lowerBound + else if (upperChildNullable != null) + upperChildNullable.findRankLowerBound(newRank) + else + null + } else r2 } - private def findRankUpperBound(rank: Long): Option[Double] = { - if (rank > count) - None + private def findRankUpperBound(rank: Long): java.lang.Double = { + if (rank > _count) + null else { - lowerChild.flatMap{ _.findRankUpperBound(rank) }.orElse { - val lowerCount = lowerChild.map{ _.count }.getOrElse(0L) - upperChild.flatMap{ _.findRankUpperBound(rank - lowerCount) }.orElse(Some(upperBound)) - } + val r = if (lowerChildNullable != null) { + lowerChildNullable.findRankUpperBound(rank) + } else null + if (r == null) { + val lowerCount = if (lowerChildNullable == null) 0L else lowerChildNullable.count + + val r2: java.lang.Double = if (upperChildNullable != null) { + upperChildNullable.findRankUpperBound(rank - lowerCount) + } else null + + if (r2 == null) { + upperBound + } else r2 + } else r } } @@ -271,8 +355,8 @@ case class QTree[A]( * are at most 2^k nodes, but usually fewer. */ def compress(k: Int)(implicit m: Monoid[A]): QTree[A] = { - val minCount = count >> k - if ((minCount > 1L) || (count < 1L)) { + val minCount = _count >> k + if ((minCount > 1L) || (_count < 1L)) { pruneChildren(minCount) } else { // count > 0, so for all nodes, if minCount <= 1, then count >= minCount @@ -285,29 +369,30 @@ case class QTree[A]( // If we don't prune we MUST return this private def pruneChildren(minCount: Long)(implicit m: Monoid[A]): QTree[A] = - if (count < minCount) { - copy(sum = totalSum, lowerChild = None, upperChild = None) + if (_count < minCount) { + new QTree[A](totalSum, _offset, _level, _count, null, null) } else { - val newLower = pruneChild(minCount, lowerChild) - val lowerNotPruned = newLower eq lowerChild - val newUpper = pruneChild(minCount, upperChild) - val upperNotPruned = newUpper eq upperChild + val newLower = pruneChild(minCount, lowerChildNullable) + val lowerNotPruned = newLower eq lowerChildNullable + val newUpper = pruneChild(minCount, upperChildNullable) + val upperNotPruned = newUpper eq upperChildNullable if (lowerNotPruned && upperNotPruned) this else - copy(lowerChild = newLower, upperChild = newUpper) + new QTree[A](_sum, _offset, _level, _count, newLower, newUpper) } // If we don't prune we MUST return child @inline private def pruneChild(minCount: Long, - child: Option[QTree[A]])(implicit m: Monoid[A]): Option[QTree[A]] = child match { - case exists @ Some(oldChild) => - val newChild = oldChild.pruneChildren(minCount) - if (newChild eq oldChild) exists // need to pass the same reference if we don't change - else Some(newChild) - case n @ None => n // make sure we pass the same ref out - } + childNullable: QTree[A])(implicit m: Monoid[A]): QTree[A] = + if (childNullable == null) + null + else { + val newChild = childNullable.pruneChildren(minCount) + if (newChild eq childNullable) childNullable // need to pass the same reference if we don't change + else newChild + } /** * How many total nodes are there in the QTree. @@ -334,20 +419,20 @@ case class QTree[A]( private def parentCount = { val childCounts = mapChildrenWithDefault(0L){ _.count } - count - childCounts._1 - childCounts._2 + _count - childCounts._1 - childCounts._2 } /** * A debug method that prints the QTree to standard out using print/println */ - def dump { - for (i <- (20 to level by -1)) + def dump() { + for (i <- (20 to _level by -1)) print(" ") - print(lowerBound + " - " + upperBound + ": " + count) + print(lowerBound + " - " + upperBound + ": " + _count) if (lowerChild.isDefined || upperChild.isDefined) { print(" (" + parentCount + ")") } - println(" {" + sum + "}") + println(" {" + _sum + "}") lowerChild.foreach{ _.dump } upperChild.foreach{ _.dump } } diff --git a/project/Build.scala b/project/Build.scala index 189465fc7..a09a39acb 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -35,7 +35,7 @@ object AlgebirdBuild extends Build { javacOptions ++= Seq("-target", "1.6", "-source", "1.6"), - scalacOptions ++= Seq("-unchecked", "-deprecation", "-language:implicitConversions", "-language:higherKinds", "-language:existentials"), + scalacOptions ++= Seq("-unchecked", "-deprecation", "-optimize", "-Xlint", "-language:implicitConversions", "-language:higherKinds", "-language:existentials"), scalacOptions <++= (scalaVersion) map { sv => if (sv startsWith "2.10") @@ -124,6 +124,7 @@ object AlgebirdBuild extends Build { algebirdCore, algebirdUtil, algebirdBijection, + algebirdBenchmark, algebirdSpark ) @@ -131,8 +132,7 @@ object AlgebirdBuild extends Build { val id = "algebird-%s".format(name) Project(id = id, base = file(id), settings = sharedSettings ++ Seq( Keys.name := id, - previousArtifact := youngestForwardCompatible(name)) ++ - JmhPlugin.projectSettings + previousArtifact := youngestForwardCompatible(name)) ) } @@ -167,7 +167,7 @@ object AlgebirdBuild extends Build { }, addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full) ).dependsOn(algebirdCore) - lazy val algebirdBenchmark = module("benchmark").settings( + lazy val algebirdBenchmark = module("benchmark").settings(JmhPlugin.projectSettings:_*).settings( libraryDependencies ++= Seq("com.twitter" %% "bijection-core" % "0.8.0") ).dependsOn(algebirdCore, algebirdUtil, algebirdTest % "test->compile").enablePlugins(JmhPlugin)