From 50eb9b4b95c2c3124db0b9db58b6698e92917e4b Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Sat, 24 Dec 2016 00:49:37 -0800 Subject: [PATCH 1/8] Track state in summingbird-online as an Iterator rather than a Seq. This should avoid n^2 compute comlexity when summing single element lists of Storm tuples. --- .../online/executor/AsyncBase.scala | 20 ++++++++----- .../online/executor/FinalFlatMap.scala | 19 ++++++------ .../online/executor/IntermediateFlatMap.scala | 4 +-- .../online/executor/OperationContainer.scala | 6 ++-- .../summingbird/online/executor/Summer.scala | 9 +++--- .../online/executor/AsyncBaseSpec.scala | 29 ++++++++++--------- .../twitter/summingbird/storm/BaseBolt.scala | 6 ++-- 7 files changed, 51 insertions(+), 42 deletions(-) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala index 90ac344e3..408a7f805 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala @@ -16,10 +16,10 @@ limitations under the License. package com.twitter.summingbird.online.executor +import com.twitter.algebird.Semigroup import com.twitter.summingbird.online.FutureQueue import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } import com.twitter.util._ - import scala.util.Try abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaitingTime: MaxFutureWaitTime, maxEmitPerExec: MaxEmitPerExecute) extends Serializable with OperationContainer[I, O, S] { @@ -29,21 +29,25 @@ abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaiti * cases that need to complete operations after or before doing a FlatMapOperation or * doing a store merge */ - def apply(state: S, in: I): Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]] - def tick: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]] = Future.value(Nil) + def apply(state: S, in: I): Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[O]])]] + def tick: Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[O]])]] = Future.value(Nil) + + implicit def itertorSemigroup[T]: Semigroup[Iterator[T]] = new Semigroup[Iterator[T]] { + override def plus(l: Iterator[T], r: Iterator[T]): Iterator[T] = l ++ r + } - private[executor] lazy val futureQueue = new FutureQueue[Seq[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime) + private[executor] lazy val futureQueue = new FutureQueue[Iterator[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime) - override def executeTick: TraversableOnce[(Seq[S], Try[TraversableOnce[O]])] = + override def executeTick: TraversableOnce[(Iterator[S], Try[TraversableOnce[O]])] = finishExecute(None, tick) - override def execute(state: S, data: I): TraversableOnce[(Seq[S], Try[TraversableOnce[O]])] = + override def execute(state: S, data: I): TraversableOnce[(Iterator[S], Try[TraversableOnce[O]])] = finishExecute(Some(state), apply(state, data)) - private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]]) = { + private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[O]])]]) = { fIn.respond { case Return(iter) => futureQueue.addAll(iter) - case Throw(ex) => futureQueue.add(failStateOpt.toSeq, Future.exception(ex)) + case Throw(ex) => futureQueue.add(failStateOpt.toIterator, Future.exception(ex)) } futureQueue.dequeue(maxEmitPerExec.get) } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala index 5f9b59fbb..23c4a172b 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala @@ -67,15 +67,16 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]]( val lockedOp = Externalizer(flatMapOp) type SummerK = Key - type SummerV = (Seq[S], Value) - lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Seq[S], Value)]]) + type SummerV = (Iterator[S], Value) + + lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Iterator[S], Value)]]) // Lazy transient as const futures are not serializable @transient private[this] lazy val noData = List( - (List(), Future.value(Nil)) + (List().toIterator, Future.value(Nil)) ) - private def formatResult(outData: Map[Key, (Seq[S], Value)]): TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])] = { + private def formatResult(outData: Map[Key, (Iterator[S], Value)]): TraversableOnce[(Iterator[S], Future[TraversableOnce[OutputElement]])] = { if (outData.isEmpty) { noData } else { @@ -91,28 +92,28 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]]( mmMap.toIterator.map { case (outerKey, (listS, innerMap)) => - (listS, Future.value(List((outerKey, innerMap)))) + (listS.toIterator, Future.value(List((outerKey, innerMap)))) } } } - override def tick: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])]] = + override def tick: Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[OutputElement]])]] = sCache.tick.map(formatResult(_)) def cache(state: S, - items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])]] = + items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[OutputElement]])]] = try { val itemL = items.toList if (itemL.size > 0) { state.fanOut(itemL.size) sCache.addAll(itemL.map { case (k, v) => - k -> (List(state), v) + k -> (Iterator.single(state), v) }).map(formatResult(_)) } else { // Here we handle mapping to nothing, option map et. al Future.value( List( - (List(state), Future.value(Nil)) + (Iterator.single(state), Future.value(Nil)) ) ) } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala index 8b3559179..cbebda84b 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala @@ -35,9 +35,9 @@ class IntermediateFlatMap[T, U, S]( val lockedOp = Externalizer(flatMapOp) override def apply(state: S, - tup: T): Future[Iterable[(List[S], Future[TraversableOnce[U]])]] = + tup: T): Future[Iterable[(Iterator[S], Future[TraversableOnce[U]])]] = lockedOp.get.apply(tup).map { res => - List((List(state), Future.value(res))) + List((Iterator.single(state), Future.value(res))) } override def cleanup(): Unit = lockedOp.get.close diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala index ca3a042bc..87d3a7d35 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala @@ -22,8 +22,8 @@ trait OperationContainer[Input, Output, State] { def init(): Unit = {} def cleanup(): Unit = {} - def executeTick: TraversableOnce[(Seq[State], Try[TraversableOnce[Output]])] + def executeTick: TraversableOnce[(Iterator[State], Try[TraversableOnce[Output]])] def execute(state: State, - data: Input): TraversableOnce[(Seq[State], Try[TraversableOnce[Output]])] - def notifyFailure(inputs: Seq[State], e: Throwable): Unit = {} + data: Input): TraversableOnce[(Iterator[State], Try[TraversableOnce[Output]])] + def notifyFailure(inputs: Iterator[State], e: Throwable): Unit = {} } \ No newline at end of file diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala index 5b79d7df7..346598192 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala @@ -72,7 +72,8 @@ class Summer[Key, Value: Semigroup, Event, S]( lazy val storePromise = Promise[Mergeable[Key, Value]] lazy val store = Await.result(storePromise) - lazy val sSummer: AsyncSummer[(Key, (Seq[InputState[S]], Value)), Map[Key, (Seq[InputState[S]], Value)]] = summerBuilder.getSummer[Key, (Seq[InputState[S]], Value)](implicitly[Semigroup[(Seq[InputState[S]], Value)]]) + lazy val sSummer: AsyncSummer[(Key, (Iterator[InputState[S]], Value)), Map[Key, (Iterator[InputState[S]], Value)]] = + summerBuilder.getSummer[Key, (Iterator[InputState[S]], Value)](implicitly[Semigroup[(Iterator[InputState[S]], Value)]]) val exceptionHandlerBox = Externalizer(exceptionHandler.handlerFn.lift) val successHandlerBox = Externalizer(successHandler) @@ -86,12 +87,12 @@ class Summer[Key, Value: Semigroup, Event, S]( successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None } - override def notifyFailure(inputs: Seq[InputState[S]], error: Throwable): Unit = { + override def notifyFailure(inputs: Iterator[InputState[S]], error: Throwable): Unit = { super.notifyFailure(inputs, error) exceptionHandlerBox.get.apply(error) } - private def handleResult(kvs: Map[Key, (Seq[InputState[S]], Value)]): TraversableOnce[(Seq[InputState[S]], Future[TraversableOnce[Event]])] = + private def handleResult(kvs: Map[Key, (Iterator[InputState[S]], Value)]): TraversableOnce[(Iterator[InputState[S]], Future[TraversableOnce[Event]])] = store.multiMerge(kvs.mapValues(_._2)).iterator.map { case (k, beforeF) => val (tups, delta) = kvs(k) @@ -110,7 +111,7 @@ class Summer[Key, Value: Semigroup, Event, S]( state.fanOut(innerTuples.size) val cacheEntries = innerTuples.map { case (k, v) => - (k, (List(state), v)) + (k, (Iterator.single(state), v)) } sSummer.addAll(cacheEntries).map(handleResult(_)) diff --git a/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala b/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala index 3b92cb838..4a7f89b1a 100644 --- a/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala +++ b/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala @@ -24,33 +24,36 @@ import org.scalatest.WordSpec import scala.util.Try class AsyncBaseSpec extends WordSpec { - val data = Seq((Seq(100, 104, 99), Future(Seq(9, 10, 13))), (Seq(12, 19), Future(Seq(100, 200, 500)))) - val dequeueData = List((Seq(8, 9), Try(Seq(4, 5, 6)))) + val data = Seq( + (Seq(100, 104, 99).toIterator, Future(Seq(9, 10, 13))), + (Seq(12, 19).toIterator, Future(Seq(100, 200, 500)))) - class TestFutureQueue extends FutureQueue[Seq[Int], TraversableOnce[Int]]( + val dequeueData = List((Seq(8, 9).toIterator, Try(Seq(4, 5, 6)))) + + class TestFutureQueue extends FutureQueue[Iterator[Int], TraversableOnce[Int]]( MaxWaitingFutures(100), MaxFutureWaitTime(1.minute) ) { var added = false - var addedData: (Seq[Int], Future[TraversableOnce[Int]]) = _ - var addedAllData: TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])] = _ + var addedData: (Iterator[Int], Future[TraversableOnce[Int]]) = _ + var addedAllData: TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])] = _ var dequeued = false var dequeuedCount: Int = 0 - override def add(state: Seq[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized { + override def add(state: Iterator[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized { assert(!added) added = true addedData = (state, fut) } override def addAll( - iter: TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]): Unit = synchronized { + iter: TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]): Unit = synchronized { assert(!added) added = true addedAllData = iter } - override def dequeue(maxItems: Int): Seq[(Seq[Int], Try[TraversableOnce[Int]])] = synchronized { + override def dequeue(maxItems: Int): Seq[(Iterator[Int], Try[TraversableOnce[Int]])] = synchronized { assert(!dequeued) dequeued = true dequeuedCount = maxItems @@ -60,8 +63,8 @@ class AsyncBaseSpec extends WordSpec { class TestAsyncBase( queue: TestFutureQueue, - tickData: => Future[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"), - applyData: => Future[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int]( + tickData: => Future[TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"), + applyData: => Future[TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int]( MaxWaitingFutures(100), MaxFutureWaitTime(1.minute), MaxEmitPerExecute(57) @@ -71,7 +74,7 @@ class AsyncBaseSpec extends WordSpec { override def tick = tickData } - def promise = Promise[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]] + def promise = Promise[TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]] "Queues tick on executeTick" in { val queue = new TestFutureQueue @@ -112,7 +115,7 @@ class AsyncBaseSpec extends WordSpec { p.setException(ex) assert(queue.added) - assert(queue.addedData._1 === Nil) + assert(queue.addedData._1.toList === Nil) assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) }) } @@ -127,7 +130,7 @@ class AsyncBaseSpec extends WordSpec { p.setException(ex) assert(queue.added) - assert(queue.addedData._1 === List(1089)) + assert(queue.addedData._1.toList === List(1089)) assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) }) } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index a0e239d83..bda5d9de5 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -118,7 +118,7 @@ case class BaseBolt[I, O](jobID: JobId, logger.error(message, err) } - private def fail(inputs: Seq[InputState[Tuple]], error: Throwable): Unit = { + private def fail(inputs: Iterator[InputState[Tuple]], error: Throwable): Unit = { executor.notifyFailure(inputs, error) if (!earlyAck) { inputs.foreach(_.fail(collector.fail(_))) } logError("Storm DAG of: %d tuples failed".format(inputs.size), error) @@ -149,12 +149,12 @@ case class BaseBolt[I, O](jobID: JobId, } } - private def finish(inputs: Seq[InputState[Tuple]], results: TraversableOnce[O]) { + private def finish(inputs: Iterator[InputState[Tuple]], results: TraversableOnce[O]) { var emitCount = 0 if (hasDependants) { if (anchorTuples.anchor) { results.foreach { result => - collector.emit(inputs.map(_.state).asJava, encoder(result)) + collector.emit(inputs.map(_.state).toList.asJava, encoder(result)) emitCount += 1 } } else { // don't anchor From 84842a42530bf72ba3576ea33001800f43521518 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Sat, 24 Dec 2016 17:35:12 -0800 Subject: [PATCH 2/8] Use Stream instead of iterator for tracking InputState --- .../online/executor/AsyncBase.scala | 18 +++++++------- .../online/executor/FinalFlatMap.scala | 18 +++++++------- .../online/executor/IntermediateFlatMap.scala | 4 ++-- .../online/executor/OperationContainer.scala | 6 ++--- .../summingbird/online/executor/Summer.scala | 10 ++++---- .../online/executor/AsyncBaseSpec.scala | 24 +++++++++---------- .../twitter/summingbird/storm/BaseBolt.scala | 4 ++-- 7 files changed, 42 insertions(+), 42 deletions(-) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala index 408a7f805..3dab8f24f 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala @@ -29,25 +29,25 @@ abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaiti * cases that need to complete operations after or before doing a FlatMapOperation or * doing a store merge */ - def apply(state: S, in: I): Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[O]])]] - def tick: Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[O]])]] = Future.value(Nil) + def apply(state: S, in: I): Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]] + def tick: Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]] = Future.value(Nil) - implicit def itertorSemigroup[T]: Semigroup[Iterator[T]] = new Semigroup[Iterator[T]] { - override def plus(l: Iterator[T], r: Iterator[T]): Iterator[T] = l ++ r + implicit def itertorSemigroup[T]: Semigroup[Stream[T]] = new Semigroup[Stream[T]] { + override def plus(l: Stream[T], r: Stream[T]): Stream[T] = l ++ r } - private[executor] lazy val futureQueue = new FutureQueue[Iterator[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime) + private[executor] lazy val futureQueue = new FutureQueue[Stream[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime) - override def executeTick: TraversableOnce[(Iterator[S], Try[TraversableOnce[O]])] = + override def executeTick: TraversableOnce[(Stream[S], Try[TraversableOnce[O]])] = finishExecute(None, tick) - override def execute(state: S, data: I): TraversableOnce[(Iterator[S], Try[TraversableOnce[O]])] = + override def execute(state: S, data: I): TraversableOnce[(Stream[S], Try[TraversableOnce[O]])] = finishExecute(Some(state), apply(state, data)) - private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[O]])]]) = { + private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]]) = { fIn.respond { case Return(iter) => futureQueue.addAll(iter) - case Throw(ex) => futureQueue.add(failStateOpt.toIterator, Future.exception(ex)) + case Throw(ex) => futureQueue.add(failStateOpt.toStream, Future.exception(ex)) } futureQueue.dequeue(maxEmitPerExec.get) } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala index 23c4a172b..b947106b7 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala @@ -67,16 +67,16 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]]( val lockedOp = Externalizer(flatMapOp) type SummerK = Key - type SummerV = (Iterator[S], Value) + type SummerV = (Stream[S], Value) - lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Iterator[S], Value)]]) + lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Stream[S], Value)]]) // Lazy transient as const futures are not serializable @transient private[this] lazy val noData = List( - (List().toIterator, Future.value(Nil)) + (List().toStream, Future.value(Nil)) ) - private def formatResult(outData: Map[Key, (Iterator[S], Value)]): TraversableOnce[(Iterator[S], Future[TraversableOnce[OutputElement]])] = { + private def formatResult(outData: Map[Key, (Stream[S], Value)]): TraversableOnce[(Stream[S], Future[TraversableOnce[OutputElement]])] = { if (outData.isEmpty) { noData } else { @@ -92,28 +92,28 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]]( mmMap.toIterator.map { case (outerKey, (listS, innerMap)) => - (listS.toIterator, Future.value(List((outerKey, innerMap)))) + (listS.toStream, Future.value(List((outerKey, innerMap)))) } } } - override def tick: Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[OutputElement]])]] = + override def tick: Future[TraversableOnce[(Stream[S], Future[TraversableOnce[OutputElement]])]] = sCache.tick.map(formatResult(_)) def cache(state: S, - items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Iterator[S], Future[TraversableOnce[OutputElement]])]] = + items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Stream[S], Future[TraversableOnce[OutputElement]])]] = try { val itemL = items.toList if (itemL.size > 0) { state.fanOut(itemL.size) sCache.addAll(itemL.map { case (k, v) => - k -> (Iterator.single(state), v) + k -> (Stream(state), v) }).map(formatResult(_)) } else { // Here we handle mapping to nothing, option map et. al Future.value( List( - (Iterator.single(state), Future.value(Nil)) + (Stream(state), Future.value(Nil)) ) ) } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala index cbebda84b..9e70418c1 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala @@ -35,9 +35,9 @@ class IntermediateFlatMap[T, U, S]( val lockedOp = Externalizer(flatMapOp) override def apply(state: S, - tup: T): Future[Iterable[(Iterator[S], Future[TraversableOnce[U]])]] = + tup: T): Future[Iterable[(Stream[S], Future[TraversableOnce[U]])]] = lockedOp.get.apply(tup).map { res => - List((Iterator.single(state), Future.value(res))) + List((Stream(state), Future.value(res))) } override def cleanup(): Unit = lockedOp.get.close diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala index 87d3a7d35..a04384b78 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala @@ -22,8 +22,8 @@ trait OperationContainer[Input, Output, State] { def init(): Unit = {} def cleanup(): Unit = {} - def executeTick: TraversableOnce[(Iterator[State], Try[TraversableOnce[Output]])] + def executeTick: TraversableOnce[(Stream[State], Try[TraversableOnce[Output]])] def execute(state: State, - data: Input): TraversableOnce[(Iterator[State], Try[TraversableOnce[Output]])] - def notifyFailure(inputs: Iterator[State], e: Throwable): Unit = {} + data: Input): TraversableOnce[(Stream[State], Try[TraversableOnce[Output]])] + def notifyFailure(inputs: Stream[State], e: Throwable): Unit = {} } \ No newline at end of file diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala index 346598192..81e9ecf94 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala @@ -72,8 +72,8 @@ class Summer[Key, Value: Semigroup, Event, S]( lazy val storePromise = Promise[Mergeable[Key, Value]] lazy val store = Await.result(storePromise) - lazy val sSummer: AsyncSummer[(Key, (Iterator[InputState[S]], Value)), Map[Key, (Iterator[InputState[S]], Value)]] = - summerBuilder.getSummer[Key, (Iterator[InputState[S]], Value)](implicitly[Semigroup[(Iterator[InputState[S]], Value)]]) + lazy val sSummer: AsyncSummer[(Key, (Stream[InputState[S]], Value)), Map[Key, (Stream[InputState[S]], Value)]] = + summerBuilder.getSummer[Key, (Stream[InputState[S]], Value)](implicitly[Semigroup[(Stream[InputState[S]], Value)]]) val exceptionHandlerBox = Externalizer(exceptionHandler.handlerFn.lift) val successHandlerBox = Externalizer(successHandler) @@ -87,12 +87,12 @@ class Summer[Key, Value: Semigroup, Event, S]( successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None } - override def notifyFailure(inputs: Iterator[InputState[S]], error: Throwable): Unit = { + override def notifyFailure(inputs: Stream[InputState[S]], error: Throwable): Unit = { super.notifyFailure(inputs, error) exceptionHandlerBox.get.apply(error) } - private def handleResult(kvs: Map[Key, (Iterator[InputState[S]], Value)]): TraversableOnce[(Iterator[InputState[S]], Future[TraversableOnce[Event]])] = + private def handleResult(kvs: Map[Key, (Stream[InputState[S]], Value)]): TraversableOnce[(Stream[InputState[S]], Future[TraversableOnce[Event]])] = store.multiMerge(kvs.mapValues(_._2)).iterator.map { case (k, beforeF) => val (tups, delta) = kvs(k) @@ -111,7 +111,7 @@ class Summer[Key, Value: Semigroup, Event, S]( state.fanOut(innerTuples.size) val cacheEntries = innerTuples.map { case (k, v) => - (k, (Iterator.single(state), v)) + (k, (Stream(state), v)) } sSummer.addAll(cacheEntries).map(handleResult(_)) diff --git a/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala b/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala index 4a7f89b1a..29e00b988 100644 --- a/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala +++ b/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala @@ -25,35 +25,35 @@ import scala.util.Try class AsyncBaseSpec extends WordSpec { val data = Seq( - (Seq(100, 104, 99).toIterator, Future(Seq(9, 10, 13))), - (Seq(12, 19).toIterator, Future(Seq(100, 200, 500)))) + (Seq(100, 104, 99).toStream, Future(Seq(9, 10, 13))), + (Seq(12, 19).toStream, Future(Seq(100, 200, 500)))) - val dequeueData = List((Seq(8, 9).toIterator, Try(Seq(4, 5, 6)))) + val dequeueData = List((Seq(8, 9).toStream, Try(Seq(4, 5, 6)))) - class TestFutureQueue extends FutureQueue[Iterator[Int], TraversableOnce[Int]]( + class TestFutureQueue extends FutureQueue[Stream[Int], TraversableOnce[Int]]( MaxWaitingFutures(100), MaxFutureWaitTime(1.minute) ) { var added = false - var addedData: (Iterator[Int], Future[TraversableOnce[Int]]) = _ - var addedAllData: TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])] = _ + var addedData: (Stream[Int], Future[TraversableOnce[Int]]) = _ + var addedAllData: TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])] = _ var dequeued = false var dequeuedCount: Int = 0 - override def add(state: Iterator[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized { + override def add(state: Stream[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized { assert(!added) added = true addedData = (state, fut) } override def addAll( - iter: TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]): Unit = synchronized { + iter: TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]): Unit = synchronized { assert(!added) added = true addedAllData = iter } - override def dequeue(maxItems: Int): Seq[(Iterator[Int], Try[TraversableOnce[Int]])] = synchronized { + override def dequeue(maxItems: Int): Seq[(Stream[Int], Try[TraversableOnce[Int]])] = synchronized { assert(!dequeued) dequeued = true dequeuedCount = maxItems @@ -63,8 +63,8 @@ class AsyncBaseSpec extends WordSpec { class TestAsyncBase( queue: TestFutureQueue, - tickData: => Future[TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"), - applyData: => Future[TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int]( + tickData: => Future[TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"), + applyData: => Future[TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int]( MaxWaitingFutures(100), MaxFutureWaitTime(1.minute), MaxEmitPerExecute(57) @@ -74,7 +74,7 @@ class AsyncBaseSpec extends WordSpec { override def tick = tickData } - def promise = Promise[TraversableOnce[(Iterator[Int], Future[TraversableOnce[Int]])]] + def promise = Promise[TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]] "Queues tick on executeTick" in { val queue = new TestFutureQueue diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index bda5d9de5..f8f420fa5 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -118,7 +118,7 @@ case class BaseBolt[I, O](jobID: JobId, logger.error(message, err) } - private def fail(inputs: Iterator[InputState[Tuple]], error: Throwable): Unit = { + private def fail(inputs: Stream[InputState[Tuple]], error: Throwable): Unit = { executor.notifyFailure(inputs, error) if (!earlyAck) { inputs.foreach(_.fail(collector.fail(_))) } logError("Storm DAG of: %d tuples failed".format(inputs.size), error) @@ -149,7 +149,7 @@ case class BaseBolt[I, O](jobID: JobId, } } - private def finish(inputs: Iterator[InputState[Tuple]], results: TraversableOnce[O]) { + private def finish(inputs: Stream[InputState[Tuple]], results: TraversableOnce[O]) { var emitCount = 0 if (hasDependants) { if (anchorTuples.anchor) { From 2188e3e4dcff439be759303ced274a234f0b6c27 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Sat, 24 Dec 2016 19:24:26 -0800 Subject: [PATCH 3/8] Rename semigroup. --- .../com/twitter/summingbird/online/executor/AsyncBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala index 3dab8f24f..2ba78fe5b 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala @@ -32,7 +32,7 @@ abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaiti def apply(state: S, in: I): Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]] def tick: Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]] = Future.value(Nil) - implicit def itertorSemigroup[T]: Semigroup[Stream[T]] = new Semigroup[Stream[T]] { + implicit def streamSemigroup[T]: Semigroup[Stream[T]] = new Semigroup[Stream[T]] { override def plus(l: Stream[T], r: Stream[T]): Stream[T] = l ++ r } From 8ca59c63b36b57eb19ff532d482c74658090f4e2 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Mon, 2 Jan 2017 19:28:34 -0800 Subject: [PATCH 4/8] Use chain instead of stream --- build.sbt | 5 +++- .../online/executor/AsyncBase.scala | 24 +++++++++------ .../online/executor/FinalFlatMap.scala | 21 +++++++------- .../online/executor/IntermediateFlatMap.scala | 5 ++-- .../online/executor/OperationContainer.scala | 9 +++--- .../summingbird/online/executor/Summer.scala | 12 ++++---- .../online/executor/AsyncBaseSpec.scala | 29 ++++++++++--------- .../twitter/summingbird/storm/BaseBolt.scala | 12 ++++---- 8 files changed, 67 insertions(+), 50 deletions(-) diff --git a/build.sbt b/build.sbt index eec74899e..49ff12c7b 100644 --- a/build.sbt +++ b/build.sbt @@ -32,6 +32,7 @@ val storehausVersion = "0.15.0-RC1" val stormDep = "storm" % "storm" % "0.9.0-wip15" //This project also compiles with the latest storm, which is in fact required to run the example val tormentaVersion = "0.11.1" val utilVersion = "6.34.0" +val chainVersion = "0.1.0" val extraSettings = mimaDefaultSettings ++ scalariformSettings @@ -293,7 +294,8 @@ lazy val summingbirdOnline = module("online").settings( "com.twitter" %% "chill" % chillVersion, "com.twitter" %% "storehaus-algebra" % storehausVersion, "com.twitter" %% "util-core" % utilVersion, - "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test" + "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test", + "org.spire-math" %% "chain" % chainVersion ) ).dependsOn( summingbirdCore % "test->test;compile->compile", @@ -315,6 +317,7 @@ lazy val summingbirdStorm = module("storm").settings( "com.twitter" %% "scalding-args" % scaldingVersion, "com.twitter" %% "tormenta-core" % tormentaVersion, "com.twitter" %% "util-core" % utilVersion, + "org.spire-math" %% "chain" % chainVersion, stormDep % "provided" ) ).dependsOn( diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala index 2ba78fe5b..0fb953c52 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala @@ -20,6 +20,7 @@ import com.twitter.algebird.Semigroup import com.twitter.summingbird.online.FutureQueue import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } import com.twitter.util._ +import chain.Chain import scala.util.Try abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaitingTime: MaxFutureWaitTime, maxEmitPerExec: MaxEmitPerExecute) extends Serializable with OperationContainer[I, O, S] { @@ -29,25 +30,30 @@ abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaiti * cases that need to complete operations after or before doing a FlatMapOperation or * doing a store merge */ - def apply(state: S, in: I): Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]] - def tick: Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]] = Future.value(Nil) + def apply(state: S, in: I): Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]] + def tick: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]] = Future.value(Nil) - implicit def streamSemigroup[T]: Semigroup[Stream[T]] = new Semigroup[Stream[T]] { - override def plus(l: Stream[T], r: Stream[T]): Stream[T] = l ++ r + implicit def chainSemigroup[T]: Semigroup[Chain[T]] = new Semigroup[Chain[T]] { + override def plus(l: Chain[T], r: Chain[T]): Chain[T] = l ++ r } - private[executor] lazy val futureQueue = new FutureQueue[Stream[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime) + private[executor] lazy val futureQueue = new FutureQueue[Chain[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime) - override def executeTick: TraversableOnce[(Stream[S], Try[TraversableOnce[O]])] = + override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = finishExecute(None, tick) - override def execute(state: S, data: I): TraversableOnce[(Stream[S], Try[TraversableOnce[O]])] = + override def execute(state: S, data: I): TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = finishExecute(Some(state), apply(state, data)) - private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Stream[S], Future[TraversableOnce[O]])]]) = { + private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]]) = { fIn.respond { case Return(iter) => futureQueue.addAll(iter) - case Throw(ex) => futureQueue.add(failStateOpt.toStream, Future.exception(ex)) + case Throw(ex) => + val failState = failStateOpt match { + case Some(state) => Chain.single(state) + case None => Chain.Empty + } + futureQueue.add(failState, Future.exception(ex)) } futureQueue.dequeue(maxEmitPerExec.get) } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala index b947106b7..b278750b7 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/FinalFlatMap.scala @@ -29,6 +29,7 @@ import com.twitter.summingbird.online.option.{ MaxFutureWaitTime, MaxEmitPerExecute } +import chain.Chain import scala.collection.mutable.{ Map => MMap, ListBuffer } // These CMaps we generate in the FFM, we use it as an immutable wrapper around // a mutable map. @@ -67,16 +68,16 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]]( val lockedOp = Externalizer(flatMapOp) type SummerK = Key - type SummerV = (Stream[S], Value) + type SummerV = (Chain[S], Value) - lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Stream[S], Value)]]) + lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Chain[S], Value)]]) // Lazy transient as const futures are not serializable @transient private[this] lazy val noData = List( - (List().toStream, Future.value(Nil)) + (Chain.empty, Future.value(Nil)) ) - private def formatResult(outData: Map[Key, (Stream[S], Value)]): TraversableOnce[(Stream[S], Future[TraversableOnce[OutputElement]])] = { + private def formatResult(outData: Map[Key, (Chain[S], Value)]): TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])] = { if (outData.isEmpty) { noData } else { @@ -86,34 +87,34 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]]( case (k, (listS, v)) => val newK = summerShards.summerIdFor(k) val (buffer, mmap) = mmMap.getOrElseUpdate(newK, (ListBuffer[S](), MMap[Key, Value]())) - buffer ++= listS + buffer ++= listS.iterator mmap += k -> v } mmMap.toIterator.map { case (outerKey, (listS, innerMap)) => - (listS.toStream, Future.value(List((outerKey, innerMap)))) + (Chain(listS), Future.value(List((outerKey, innerMap)))) } } } - override def tick: Future[TraversableOnce[(Stream[S], Future[TraversableOnce[OutputElement]])]] = + override def tick: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])]] = sCache.tick.map(formatResult(_)) def cache(state: S, - items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Stream[S], Future[TraversableOnce[OutputElement]])]] = + items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])]] = try { val itemL = items.toList if (itemL.size > 0) { state.fanOut(itemL.size) sCache.addAll(itemL.map { case (k, v) => - k -> (Stream(state), v) + k -> (Chain.single(state), v) }).map(formatResult(_)) } else { // Here we handle mapping to nothing, option map et. al Future.value( List( - (Stream(state), Future.value(Nil)) + (Chain.single(state), Future.value(Nil)) ) ) } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala index 9e70418c1..ba599fa53 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/IntermediateFlatMap.scala @@ -25,6 +25,7 @@ import com.twitter.summingbird.online.option.{ MaxFutureWaitTime, MaxEmitPerExecute } +import chain.Chain class IntermediateFlatMap[T, U, S]( @transient flatMapOp: FlatMapOperation[T, U], @@ -35,9 +36,9 @@ class IntermediateFlatMap[T, U, S]( val lockedOp = Externalizer(flatMapOp) override def apply(state: S, - tup: T): Future[Iterable[(Stream[S], Future[TraversableOnce[U]])]] = + tup: T): Future[Iterable[(Chain[S], Future[TraversableOnce[U]])]] = lockedOp.get.apply(tup).map { res => - List((Stream(state), Future.value(res))) + List((Chain.single(state), Future.value(res))) } override def cleanup(): Unit = lockedOp.get.close diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala index a04384b78..70856ce2a 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/OperationContainer.scala @@ -16,14 +16,15 @@ limitations under the License. package com.twitter.summingbird.online.executor +import chain.Chain import scala.util.Try trait OperationContainer[Input, Output, State] { def init(): Unit = {} def cleanup(): Unit = {} - def executeTick: TraversableOnce[(Stream[State], Try[TraversableOnce[Output]])] + def executeTick: TraversableOnce[(Chain[State], Try[TraversableOnce[Output]])] def execute(state: State, - data: Input): TraversableOnce[(Stream[State], Try[TraversableOnce[Output]])] - def notifyFailure(inputs: Stream[State], e: Throwable): Unit = {} -} \ No newline at end of file + data: Input): TraversableOnce[(Chain[State], Try[TraversableOnce[Output]])] + def notifyFailure(inputs: Chain[State], e: Throwable): Unit = {} +} diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala index 81e9ecf94..d88e75635 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/Summer.scala @@ -24,6 +24,8 @@ import com.twitter.storehaus.algebra.Mergeable import com.twitter.summingbird.online.{ FlatMapOperation, Externalizer } import com.twitter.summingbird.online.option._ +import chain.Chain + // These CMaps we generate in the FFM, we use it as an immutable wrapper around // a mutable map. import scala.collection.{ Map => CMap } @@ -72,8 +74,8 @@ class Summer[Key, Value: Semigroup, Event, S]( lazy val storePromise = Promise[Mergeable[Key, Value]] lazy val store = Await.result(storePromise) - lazy val sSummer: AsyncSummer[(Key, (Stream[InputState[S]], Value)), Map[Key, (Stream[InputState[S]], Value)]] = - summerBuilder.getSummer[Key, (Stream[InputState[S]], Value)](implicitly[Semigroup[(Stream[InputState[S]], Value)]]) + lazy val sSummer: AsyncSummer[(Key, (Chain[InputState[S]], Value)), Map[Key, (Chain[InputState[S]], Value)]] = + summerBuilder.getSummer[Key, (Chain[InputState[S]], Value)](implicitly[Semigroup[(Chain[InputState[S]], Value)]]) val exceptionHandlerBox = Externalizer(exceptionHandler.handlerFn.lift) val successHandlerBox = Externalizer(successHandler) @@ -87,12 +89,12 @@ class Summer[Key, Value: Semigroup, Event, S]( successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None } - override def notifyFailure(inputs: Stream[InputState[S]], error: Throwable): Unit = { + override def notifyFailure(inputs: Chain[InputState[S]], error: Throwable): Unit = { super.notifyFailure(inputs, error) exceptionHandlerBox.get.apply(error) } - private def handleResult(kvs: Map[Key, (Stream[InputState[S]], Value)]): TraversableOnce[(Stream[InputState[S]], Future[TraversableOnce[Event]])] = + private def handleResult(kvs: Map[Key, (Chain[InputState[S]], Value)]): TraversableOnce[(Chain[InputState[S]], Future[TraversableOnce[Event]])] = store.multiMerge(kvs.mapValues(_._2)).iterator.map { case (k, beforeF) => val (tups, delta) = kvs(k) @@ -111,7 +113,7 @@ class Summer[Key, Value: Semigroup, Event, S]( state.fanOut(innerTuples.size) val cacheEntries = innerTuples.map { case (k, v) => - (k, (Stream(state), v)) + (k, (Chain.single(state), v)) } sSummer.addAll(cacheEntries).map(handleResult(_)) diff --git a/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala b/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala index 29e00b988..fa26c0ab0 100644 --- a/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala +++ b/summingbird-online/src/test/scala/com/twitter/summingbird/online/executor/AsyncBaseSpec.scala @@ -20,40 +20,41 @@ import com.twitter.conversions.time._ import com.twitter.summingbird.online.FutureQueue import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } import com.twitter.util.{ Await, Future, Promise } +import chain.Chain import org.scalatest.WordSpec import scala.util.Try class AsyncBaseSpec extends WordSpec { val data = Seq( - (Seq(100, 104, 99).toStream, Future(Seq(9, 10, 13))), - (Seq(12, 19).toStream, Future(Seq(100, 200, 500)))) + (Chain(Seq(100, 104, 99)), Future(Seq(9, 10, 13))), + (Chain(Seq(12, 19)), Future(Seq(100, 200, 500)))) - val dequeueData = List((Seq(8, 9).toStream, Try(Seq(4, 5, 6)))) + val dequeueData = List((Chain(Seq(8, 9)), Try(Seq(4, 5, 6)))) - class TestFutureQueue extends FutureQueue[Stream[Int], TraversableOnce[Int]]( + class TestFutureQueue extends FutureQueue[Chain[Int], TraversableOnce[Int]]( MaxWaitingFutures(100), MaxFutureWaitTime(1.minute) ) { var added = false - var addedData: (Stream[Int], Future[TraversableOnce[Int]]) = _ - var addedAllData: TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])] = _ + var addedData: (Chain[Int], Future[TraversableOnce[Int]]) = _ + var addedAllData: TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])] = _ var dequeued = false var dequeuedCount: Int = 0 - override def add(state: Stream[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized { + override def add(state: Chain[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized { assert(!added) added = true addedData = (state, fut) } override def addAll( - iter: TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]): Unit = synchronized { + iter: TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]): Unit = synchronized { assert(!added) added = true addedAllData = iter } - override def dequeue(maxItems: Int): Seq[(Stream[Int], Try[TraversableOnce[Int]])] = synchronized { + override def dequeue(maxItems: Int): Seq[(Chain[Int], Try[TraversableOnce[Int]])] = synchronized { assert(!dequeued) dequeued = true dequeuedCount = maxItems @@ -63,8 +64,8 @@ class AsyncBaseSpec extends WordSpec { class TestAsyncBase( queue: TestFutureQueue, - tickData: => Future[TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"), - applyData: => Future[TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int]( + tickData: => Future[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"), + applyData: => Future[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int]( MaxWaitingFutures(100), MaxFutureWaitTime(1.minute), MaxEmitPerExecute(57) @@ -74,7 +75,7 @@ class AsyncBaseSpec extends WordSpec { override def tick = tickData } - def promise = Promise[TraversableOnce[(Stream[Int], Future[TraversableOnce[Int]])]] + def promise = Promise[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]] "Queues tick on executeTick" in { val queue = new TestFutureQueue @@ -115,7 +116,7 @@ class AsyncBaseSpec extends WordSpec { p.setException(ex) assert(queue.added) - assert(queue.addedData._1.toList === Nil) + assert(queue.addedData._1.iterator.isEmpty) assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) }) } @@ -130,7 +131,7 @@ class AsyncBaseSpec extends WordSpec { p.setException(ex) assert(queue.added) - assert(queue.addedData._1.toList === List(1089)) + assert(queue.addedData._1 === Chain.single(1089)) assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) }) } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index f8f420fa5..ff487e992 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -29,6 +29,7 @@ import com.twitter.summingbird.online.executor.InputState import com.twitter.summingbird.option.JobId import com.twitter.summingbird.{ Group, JobCounters, Name, SummingbirdRuntimeStats } import com.twitter.summingbird.online.Externalizer +import chain.Chain import scala.collection.JavaConverters._ import java.util.{ List => JList } import org.slf4j.{ Logger, LoggerFactory } @@ -118,10 +119,10 @@ case class BaseBolt[I, O](jobID: JobId, logger.error(message, err) } - private def fail(inputs: Stream[InputState[Tuple]], error: Throwable): Unit = { + private def fail(inputs: Chain[InputState[Tuple]], error: Throwable): Unit = { executor.notifyFailure(inputs, error) if (!earlyAck) { inputs.foreach(_.fail(collector.fail(_))) } - logError("Storm DAG of: %d tuples failed".format(inputs.size), error) + logError("Storm DAG of: %d tuples failed".format(inputs.iterator.size), error) } override def execute(tuple: Tuple) = { @@ -149,12 +150,13 @@ case class BaseBolt[I, O](jobID: JobId, } } - private def finish(inputs: Stream[InputState[Tuple]], results: TraversableOnce[O]) { + private def finish(inputs: Chain[InputState[Tuple]], results: TraversableOnce[O]) { + val tuples = inputs.iterator.map(_.state).toList var emitCount = 0 if (hasDependants) { if (anchorTuples.anchor) { results.foreach { result => - collector.emit(inputs.map(_.state).toList.asJava, encoder(result)) + collector.emit(tuples.asJava, encoder(result)) emitCount += 1 } } else { // don't anchor @@ -167,7 +169,7 @@ case class BaseBolt[I, O](jobID: JobId, // Always ack a tuple on completion: if (!earlyAck) { inputs.foreach(_.ack(collector.ack(_))) } - logger.debug("bolt finished processed {} linked tuples, emitted: {}", inputs.size, emitCount) + logger.debug("bolt finished processed {} linked tuples, emitted: {}", tuples.size, emitCount) } override def prepare(conf: JMap[_, _], context: TopologyContext, oc: OutputCollector) { From 17e5e63876258227c52e573c357ffe7f2275fa9a Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Mon, 2 Jan 2017 19:44:45 -0800 Subject: [PATCH 5/8] Add mima exclusions. --- build.sbt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 49ff12c7b..93efe0272 100644 --- a/build.sbt +++ b/build.sbt @@ -236,7 +236,11 @@ val ignoredABIProblems = { exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.storm.Storm.get"), exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.storm.Storm.getOrElse"), exclude[DirectMissingMethodProblem]("com.twitter.summingbird.storm.BaseBolt.apply"), - exclude[IncompatibleResultTypeProblem]("com.twitter.summingbird.example.Memcache.client") + exclude[IncompatibleResultTypeProblem]("com.twitter.summingbird.example.Memcache.client"), + exclude[DirectMissingMethodProblem]("com.twitter.summingbird.online.executor.OperationContainer.notifyFailure"), + exclude[ReversedMissingMethodProblem]("com.twitter.summingbird.online.executor.OperationContainer.notifyFailure"), + exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.online.executor.AsyncBase.notifyFailure"), + exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.online.executor.Summer.notifyFailure") ) } From e40133b7e084b7eb75bf6dc0057373e3b163585e Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Tue, 3 Jan 2017 17:40:14 -0800 Subject: [PATCH 6/8] Avoid materializing list to get tuple count in some cases. --- .../main/scala/com/twitter/summingbird/storm/BaseBolt.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index ff487e992..247c88786 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -151,11 +151,13 @@ case class BaseBolt[I, O](jobID: JobId, } private def finish(inputs: Chain[InputState[Tuple]], results: TraversableOnce[O]) { - val tuples = inputs.iterator.map(_.state).toList + var numTuples = Option.empty[Int] var emitCount = 0 if (hasDependants) { if (anchorTuples.anchor) { results.foreach { result => + val tuples = inputs.iterator.map(_.state).toList + numTuples = Some(tuples.size) collector.emit(tuples.asJava, encoder(result)) emitCount += 1 } @@ -169,7 +171,7 @@ case class BaseBolt[I, O](jobID: JobId, // Always ack a tuple on completion: if (!earlyAck) { inputs.foreach(_.ack(collector.ack(_))) } - logger.debug("bolt finished processed {} linked tuples, emitted: {}", tuples.size, emitCount) + logger.debug("bolt finished processed {} linked tuples, emitted: {}", numTuples.getOrElse(inputs.iterator.size), emitCount) } override def prepare(conf: JMap[_, _], context: TopologyContext, oc: OutputCollector) { From 8e42a6217af44509affc42838efb39d9906693d6 Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Tue, 3 Jan 2017 18:09:16 -0800 Subject: [PATCH 7/8] Oscar's comments. --- .../com/twitter/summingbird/storm/BaseBolt.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index 247c88786..4ee0eee28 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -33,6 +33,7 @@ import chain.Chain import scala.collection.JavaConverters._ import java.util.{ List => JList } import org.slf4j.{ Logger, LoggerFactory } +import scala.collection.mutable.ListBuffer /** * @@ -150,14 +151,24 @@ case class BaseBolt[I, O](jobID: JobId, } } + // Avoid additional pass to get size + private def toListWithSize[A](iter: Iterator[A]): (List[A], Int) = { + val lb = new ListBuffer[A] + for (a <- iter) { + lb += a + } + (lb.toList, lb.size) + } + private def finish(inputs: Chain[InputState[Tuple]], results: TraversableOnce[O]) { var numTuples = Option.empty[Int] var emitCount = 0 if (hasDependants) { if (anchorTuples.anchor) { + val states = inputs.iterator.map(_.state) + val (tuples, count) = toListWithSize(states) + numTuples = Some(count) results.foreach { result => - val tuples = inputs.iterator.map(_.state).toList - numTuples = Some(tuples.size) collector.emit(tuples.asJava, encoder(result)) emitCount += 1 } From a90c9da9e96b6315a698e4db089939c06bfa186f Mon Sep 17 00:00:00 2001 From: Pankaj Gupta Date: Tue, 3 Jan 2017 19:15:31 -0800 Subject: [PATCH 8/8] Simplify --- .../twitter/summingbird/storm/BaseBolt.scala | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index 4ee0eee28..46e46344c 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -33,7 +33,6 @@ import chain.Chain import scala.collection.JavaConverters._ import java.util.{ List => JList } import org.slf4j.{ Logger, LoggerFactory } -import scala.collection.mutable.ListBuffer /** * @@ -151,25 +150,13 @@ case class BaseBolt[I, O](jobID: JobId, } } - // Avoid additional pass to get size - private def toListWithSize[A](iter: Iterator[A]): (List[A], Int) = { - val lb = new ListBuffer[A] - for (a <- iter) { - lb += a - } - (lb.toList, lb.size) - } - private def finish(inputs: Chain[InputState[Tuple]], results: TraversableOnce[O]) { - var numTuples = Option.empty[Int] var emitCount = 0 if (hasDependants) { if (anchorTuples.anchor) { - val states = inputs.iterator.map(_.state) - val (tuples, count) = toListWithSize(states) - numTuples = Some(count) + val states = inputs.iterator.map(_.state).toList.asJava results.foreach { result => - collector.emit(tuples.asJava, encoder(result)) + collector.emit(states, encoder(result)) emitCount += 1 } } else { // don't anchor @@ -182,7 +169,9 @@ case class BaseBolt[I, O](jobID: JobId, // Always ack a tuple on completion: if (!earlyAck) { inputs.foreach(_.ack(collector.ack(_))) } - logger.debug("bolt finished processed {} linked tuples, emitted: {}", numTuples.getOrElse(inputs.iterator.size), emitCount) + if (logger.isDebugEnabled()) { + logger.debug("bolt finished processed {} linked tuples, emitted: {}", inputs.iterator.size, emitCount) + } } override def prepare(conf: JMap[_, _], context: TopologyContext, oc: OutputCollector) {