Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Track state in summingbird-online as an Iterator rather than a Seq. #703

Merged
merged 8 commits into from
Jan 4, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ val storehausVersion = "0.15.0-RC1"
val stormDep = "storm" % "storm" % "0.9.0-wip15" //This project also compiles with the latest storm, which is in fact required to run the example
val tormentaVersion = "0.11.1"
val utilVersion = "6.34.0"
val chainVersion = "0.1.0"

val extraSettings = mimaDefaultSettings ++ scalariformSettings

Expand Down Expand Up @@ -235,7 +236,11 @@ val ignoredABIProblems = {
exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.storm.Storm.get"),
exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.storm.Storm.getOrElse"),
exclude[DirectMissingMethodProblem]("com.twitter.summingbird.storm.BaseBolt.apply"),
exclude[IncompatibleResultTypeProblem]("com.twitter.summingbird.example.Memcache.client")
exclude[IncompatibleResultTypeProblem]("com.twitter.summingbird.example.Memcache.client"),
exclude[DirectMissingMethodProblem]("com.twitter.summingbird.online.executor.OperationContainer.notifyFailure"),
exclude[ReversedMissingMethodProblem]("com.twitter.summingbird.online.executor.OperationContainer.notifyFailure"),
exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.online.executor.AsyncBase.notifyFailure"),
exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.online.executor.Summer.notifyFailure")
)
}

Expand Down Expand Up @@ -293,7 +298,8 @@ lazy val summingbirdOnline = module("online").settings(
"com.twitter" %% "chill" % chillVersion,
"com.twitter" %% "storehaus-algebra" % storehausVersion,
"com.twitter" %% "util-core" % utilVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test"
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test",
"org.spire-math" %% "chain" % chainVersion
)
).dependsOn(
summingbirdCore % "test->test;compile->compile",
Expand All @@ -315,6 +321,7 @@ lazy val summingbirdStorm = module("storm").settings(
"com.twitter" %% "scalding-args" % scaldingVersion,
"com.twitter" %% "tormenta-core" % tormentaVersion,
"com.twitter" %% "util-core" % utilVersion,
"org.spire-math" %% "chain" % chainVersion,
stormDep % "provided"
)
).dependsOn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ limitations under the License.

package com.twitter.summingbird.online.executor

import com.twitter.algebird.Semigroup
import com.twitter.summingbird.online.FutureQueue
import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import com.twitter.util._

import chain.Chain
import scala.util.Try

abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaitingTime: MaxFutureWaitTime, maxEmitPerExec: MaxEmitPerExecute) extends Serializable with OperationContainer[I, O, S] {
Expand All @@ -29,21 +30,30 @@ abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaiti
* cases that need to complete operations after or before doing a FlatMapOperation or
* doing a store merge
*/
def apply(state: S, in: I): Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]]
def tick: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]] = Future.value(Nil)
def apply(state: S, in: I): Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]]
def tick: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]] = Future.value(Nil)

implicit def chainSemigroup[T]: Semigroup[Chain[T]] = new Semigroup[Chain[T]] {
override def plus(l: Chain[T], r: Chain[T]): Chain[T] = l ++ r
}

private[executor] lazy val futureQueue = new FutureQueue[Seq[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime)
private[executor] lazy val futureQueue = new FutureQueue[Chain[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime)

override def executeTick: TraversableOnce[(Seq[S], Try[TraversableOnce[O]])] =
override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] =
finishExecute(None, tick)

override def execute(state: S, data: I): TraversableOnce[(Seq[S], Try[TraversableOnce[O]])] =
override def execute(state: S, data: I): TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] =
finishExecute(Some(state), apply(state, data))

private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]]) = {
private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]]) = {
fIn.respond {
case Return(iter) => futureQueue.addAll(iter)
case Throw(ex) => futureQueue.add(failStateOpt.toSeq, Future.exception(ex))
case Throw(ex) =>
val failState = failStateOpt match {
case Some(state) => Chain.single(state)
case None => Chain.Empty
}
futureQueue.add(failState, Future.exception(ex))
}
futureQueue.dequeue(maxEmitPerExec.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.twitter.summingbird.online.option.{
MaxFutureWaitTime,
MaxEmitPerExecute
}
import chain.Chain
import scala.collection.mutable.{ Map => MMap, ListBuffer }
// These CMaps we generate in the FFM, we use it as an immutable wrapper around
// a mutable map.
Expand Down Expand Up @@ -67,15 +68,16 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]](
val lockedOp = Externalizer(flatMapOp)

type SummerK = Key
type SummerV = (Seq[S], Value)
lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Seq[S], Value)]])
type SummerV = (Chain[S], Value)

lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Chain[S], Value)]])

// Lazy transient as const futures are not serializable
@transient private[this] lazy val noData = List(
(List(), Future.value(Nil))
(Chain.empty, Future.value(Nil))
)

private def formatResult(outData: Map[Key, (Seq[S], Value)]): TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])] = {
private def formatResult(outData: Map[Key, (Chain[S], Value)]): TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])] = {
if (outData.isEmpty) {
noData
} else {
Expand All @@ -85,34 +87,34 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]](
case (k, (listS, v)) =>
val newK = summerShards.summerIdFor(k)
val (buffer, mmap) = mmMap.getOrElseUpdate(newK, (ListBuffer[S](), MMap[Key, Value]()))
buffer ++= listS
buffer ++= listS.iterator
mmap += k -> v
}

mmMap.toIterator.map {
case (outerKey, (listS, innerMap)) =>
(listS, Future.value(List((outerKey, innerMap))))
(Chain(listS), Future.value(List((outerKey, innerMap))))
}
}
}

override def tick: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])]] =
override def tick: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])]] =
sCache.tick.map(formatResult(_))

def cache(state: S,
items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])]] =
items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])]] =
try {
val itemL = items.toList
if (itemL.size > 0) {
state.fanOut(itemL.size)
sCache.addAll(itemL.map {
case (k, v) =>
k -> (List(state), v)
k -> (Chain.single(state), v)
}).map(formatResult(_))
} else { // Here we handle mapping to nothing, option map et. al
Future.value(
List(
(List(state), Future.value(Nil))
(Chain.single(state), Future.value(Nil))
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.twitter.summingbird.online.option.{
MaxFutureWaitTime,
MaxEmitPerExecute
}
import chain.Chain

class IntermediateFlatMap[T, U, S](
@transient flatMapOp: FlatMapOperation[T, U],
Expand All @@ -35,9 +36,9 @@ class IntermediateFlatMap[T, U, S](
val lockedOp = Externalizer(flatMapOp)

override def apply(state: S,
tup: T): Future[Iterable[(List[S], Future[TraversableOnce[U]])]] =
tup: T): Future[Iterable[(Chain[S], Future[TraversableOnce[U]])]] =
lockedOp.get.apply(tup).map { res =>
List((List(state), Future.value(res)))
List((Chain.single(state), Future.value(res)))
}

override def cleanup(): Unit = lockedOp.get.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ limitations under the License.

package com.twitter.summingbird.online.executor

import chain.Chain
import scala.util.Try

trait OperationContainer[Input, Output, State] {
def init(): Unit = {}
def cleanup(): Unit = {}

def executeTick: TraversableOnce[(Seq[State], Try[TraversableOnce[Output]])]
def executeTick: TraversableOnce[(Chain[State], Try[TraversableOnce[Output]])]
def execute(state: State,
data: Input): TraversableOnce[(Seq[State], Try[TraversableOnce[Output]])]
def notifyFailure(inputs: Seq[State], e: Throwable): Unit = {}
}
data: Input): TraversableOnce[(Chain[State], Try[TraversableOnce[Output]])]
def notifyFailure(inputs: Chain[State], e: Throwable): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.twitter.storehaus.algebra.Mergeable
import com.twitter.summingbird.online.{ FlatMapOperation, Externalizer }
import com.twitter.summingbird.online.option._

import chain.Chain

// These CMaps we generate in the FFM, we use it as an immutable wrapper around
// a mutable map.
import scala.collection.{ Map => CMap }
Expand Down Expand Up @@ -72,7 +74,8 @@ class Summer[Key, Value: Semigroup, Event, S](
lazy val storePromise = Promise[Mergeable[Key, Value]]
lazy val store = Await.result(storePromise)

lazy val sSummer: AsyncSummer[(Key, (Seq[InputState[S]], Value)), Map[Key, (Seq[InputState[S]], Value)]] = summerBuilder.getSummer[Key, (Seq[InputState[S]], Value)](implicitly[Semigroup[(Seq[InputState[S]], Value)]])
lazy val sSummer: AsyncSummer[(Key, (Chain[InputState[S]], Value)), Map[Key, (Chain[InputState[S]], Value)]] =
summerBuilder.getSummer[Key, (Chain[InputState[S]], Value)](implicitly[Semigroup[(Chain[InputState[S]], Value)]])

val exceptionHandlerBox = Externalizer(exceptionHandler.handlerFn.lift)
val successHandlerBox = Externalizer(successHandler)
Expand All @@ -86,12 +89,12 @@ class Summer[Key, Value: Semigroup, Event, S](
successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None
}

override def notifyFailure(inputs: Seq[InputState[S]], error: Throwable): Unit = {
override def notifyFailure(inputs: Chain[InputState[S]], error: Throwable): Unit = {
super.notifyFailure(inputs, error)
exceptionHandlerBox.get.apply(error)
}

private def handleResult(kvs: Map[Key, (Seq[InputState[S]], Value)]): TraversableOnce[(Seq[InputState[S]], Future[TraversableOnce[Event]])] =
private def handleResult(kvs: Map[Key, (Chain[InputState[S]], Value)]): TraversableOnce[(Chain[InputState[S]], Future[TraversableOnce[Event]])] =
store.multiMerge(kvs.mapValues(_._2)).iterator.map {
case (k, beforeF) =>
val (tups, delta) = kvs(k)
Expand All @@ -110,7 +113,7 @@ class Summer[Key, Value: Semigroup, Event, S](
state.fanOut(innerTuples.size)
val cacheEntries = innerTuples.map {
case (k, v) =>
(k, (List(state), v))
(k, (Chain.single(state), v))
}

sSummer.addAll(cacheEntries).map(handleResult(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,41 @@ import com.twitter.conversions.time._
import com.twitter.summingbird.online.FutureQueue
import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import com.twitter.util.{ Await, Future, Promise }
import chain.Chain
import org.scalatest.WordSpec
import scala.util.Try

class AsyncBaseSpec extends WordSpec {
val data = Seq((Seq(100, 104, 99), Future(Seq(9, 10, 13))), (Seq(12, 19), Future(Seq(100, 200, 500))))
val dequeueData = List((Seq(8, 9), Try(Seq(4, 5, 6))))
val data = Seq(
(Chain(Seq(100, 104, 99)), Future(Seq(9, 10, 13))),
(Chain(Seq(12, 19)), Future(Seq(100, 200, 500))))

class TestFutureQueue extends FutureQueue[Seq[Int], TraversableOnce[Int]](
val dequeueData = List((Chain(Seq(8, 9)), Try(Seq(4, 5, 6))))

class TestFutureQueue extends FutureQueue[Chain[Int], TraversableOnce[Int]](
MaxWaitingFutures(100),
MaxFutureWaitTime(1.minute)
) {
var added = false
var addedData: (Seq[Int], Future[TraversableOnce[Int]]) = _
var addedAllData: TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])] = _
var addedData: (Chain[Int], Future[TraversableOnce[Int]]) = _
var addedAllData: TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])] = _
var dequeued = false
var dequeuedCount: Int = 0

override def add(state: Seq[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized {
override def add(state: Chain[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized {
assert(!added)
added = true
addedData = (state, fut)
}

override def addAll(
iter: TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]): Unit = synchronized {
iter: TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]): Unit = synchronized {
assert(!added)
added = true
addedAllData = iter
}

override def dequeue(maxItems: Int): Seq[(Seq[Int], Try[TraversableOnce[Int]])] = synchronized {
override def dequeue(maxItems: Int): Seq[(Chain[Int], Try[TraversableOnce[Int]])] = synchronized {
assert(!dequeued)
dequeued = true
dequeuedCount = maxItems
Expand All @@ -60,8 +64,8 @@ class AsyncBaseSpec extends WordSpec {

class TestAsyncBase(
queue: TestFutureQueue,
tickData: => Future[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"),
applyData: => Future[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int](
tickData: => Future[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"),
applyData: => Future[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int](
MaxWaitingFutures(100),
MaxFutureWaitTime(1.minute),
MaxEmitPerExecute(57)
Expand All @@ -71,7 +75,7 @@ class AsyncBaseSpec extends WordSpec {
override def tick = tickData
}

def promise = Promise[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]]
def promise = Promise[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]]

"Queues tick on executeTick" in {
val queue = new TestFutureQueue
Expand Down Expand Up @@ -112,7 +116,7 @@ class AsyncBaseSpec extends WordSpec {

p.setException(ex)
assert(queue.added)
assert(queue.addedData._1 === Nil)
assert(queue.addedData._1.iterator.isEmpty)
assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) })
}

Expand All @@ -127,7 +131,7 @@ class AsyncBaseSpec extends WordSpec {

p.setException(ex)
assert(queue.added)
assert(queue.addedData._1 === List(1089))
assert(queue.addedData._1 === Chain.single(1089))
assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.twitter.summingbird.online.executor.InputState
import com.twitter.summingbird.option.JobId
import com.twitter.summingbird.{ Group, JobCounters, Name, SummingbirdRuntimeStats }
import com.twitter.summingbird.online.Externalizer
import chain.Chain
import scala.collection.JavaConverters._
import java.util.{ List => JList }
import org.slf4j.{ Logger, LoggerFactory }
Expand Down Expand Up @@ -118,10 +119,10 @@ case class BaseBolt[I, O](jobID: JobId,
logger.error(message, err)
}

private def fail(inputs: Seq[InputState[Tuple]], error: Throwable): Unit = {
private def fail(inputs: Chain[InputState[Tuple]], error: Throwable): Unit = {
executor.notifyFailure(inputs, error)
if (!earlyAck) { inputs.foreach(_.fail(collector.fail(_))) }
logError("Storm DAG of: %d tuples failed".format(inputs.size), error)
logError("Storm DAG of: %d tuples failed".format(inputs.iterator.size), error)
}

override def execute(tuple: Tuple) = {
Expand Down Expand Up @@ -149,12 +150,13 @@ case class BaseBolt[I, O](jobID: JobId,
}
}

private def finish(inputs: Seq[InputState[Tuple]], results: TraversableOnce[O]) {
private def finish(inputs: Chain[InputState[Tuple]], results: TraversableOnce[O]) {
val tuples = inputs.iterator.map(_.state).toList
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we move this line to 158 (only materialize the List if we have dependants and we anchor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning was that we need to iterate through the chain anyway to get the size for the log statement at the end of function but I agree, materialization of list should be avoided as well in those cases. Proposed a fix.

var emitCount = 0
if (hasDependants) {
if (anchorTuples.anchor) {
results.foreach { result =>
collector.emit(inputs.map(_.state).asJava, encoder(result))
collector.emit(tuples.asJava, encoder(result))
emitCount += 1
}
} else { // don't anchor
Expand All @@ -167,7 +169,7 @@ case class BaseBolt[I, O](jobID: JobId,
// Always ack a tuple on completion:
if (!earlyAck) { inputs.foreach(_.ack(collector.ack(_))) }

logger.debug("bolt finished processed {} linked tuples, emitted: {}", inputs.size, emitCount)
logger.debug("bolt finished processed {} linked tuples, emitted: {}", tuples.size, emitCount)
}

override def prepare(conf: JMap[_, _], context: TopologyContext, oc: OutputCollector) {
Expand Down