From 1b1f86fe92edbe42519a870cd9f17ac9da225eee Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Mon, 26 Jun 2017 17:58:02 -0700 Subject: [PATCH 1/4] Refactor `SummerBuilder` to make it possible to use `Counters` on `Summer` creation --- .../summingbird/online/option/AllOpts.scala | 14 +- .../summingbird/online/option/Summers.scala | 124 ++++++++++++++++++ .../summingbird/storm/BuildSummer.scala | 122 +++++------------ 3 files changed, 168 insertions(+), 92 deletions(-) create mode 100644 summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala index f27f7c3a0..f67798ea5 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala @@ -2,7 +2,8 @@ package com.twitter.summingbird.online.option import com.twitter.util.Duration import com.twitter.algebird.Semigroup -import com.twitter.algebird.util.summer.AsyncSummer +import com.twitter.algebird.util.summer.{ AsyncSummer, Incrementor } +import com.twitter.summingbird.{ Counter, Name } case class OnlineSuccessHandler(handlerFn: Unit => Unit) @@ -98,8 +99,17 @@ trait SummerBuilder extends Serializable { /** * The SummerConstructor option, set this instead of CacheSize, AsyncPoolSize, etc.. to provide how to construct the aggregation for this bolt + * @see [[Summers]] for useful [[SummerConstructor]]s. */ -case class SummerConstructor(get: SummerBuilder) +case class SummerConstructor(get: SummerConstructor.Context => SummerBuilder) + +object SummerConstructor { + def apply(get: SummerBuilder): SummerConstructor = SummerConstructor(_ => get) + + trait Context { + def counter(name: Name): Counter with Incrementor + } +} /** * How many instances/tasks of this flatmap task should be spawned in the environment diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala new file mode 100644 index 000000000..34558e035 --- /dev/null +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala @@ -0,0 +1,124 @@ +package com.twitter.summingbird.online.option + +import com.twitter.algebird.Semigroup +import com.twitter.algebird.util.summer._ +import com.twitter.summingbird.Name +import com.twitter.summingbird.online.OnlineDefaultConstants._ +import com.twitter.summingbird.option.CacheSize +import com.twitter.util.{ Future, FuturePool } +import java.util.concurrent.{ Executors, TimeUnit } + +object Summers { + val MemoryCounterName = Name("memory") + val TimeoutCounterName = Name("timeout") + val SizeCounterName = Name("size") + val TuplesInCounterName = Name("tuplesIn") + val TuplesOutCounterName = Name("tuplesOut") + val InsertCounterName = Name("inserts") + val InsertFailCounterName = Name("insertFail") + + val Null = new SummerConstructor(NullConstructor) + + def sync( + cacheSize: CacheSize = DEFAULT_FM_CACHE, + flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, + softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT + ): SummerConstructor = new SummerConstructor(SyncConstructor(cacheSize, flushFrequency, softMemoryFlushPercent)) + + def async( + cacheSize: CacheSize = DEFAULT_FM_CACHE, + flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, + softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT, + asyncPoolSize: AsyncPoolSize = DEFAULT_ASYNC_POOL_SIZE, + compactValues: CompactValues = CompactValues.default, + valueCombinerCacheSize: ValueCombinerCacheSize = DEFAULT_VALUE_COMBINER_CACHE_SIZE + ): SummerConstructor = new SummerConstructor(AsyncConstructor( + cacheSize, flushFrequency, softMemoryFlushPercent, asyncPoolSize, compactValues, valueCombinerCacheSize + )) + + private object NullConstructor extends (SummerConstructor.Context => SummerBuilder) { + override def apply(ctx: SummerConstructor.Context): SummerBuilder = { + val tuplesIn = ctx.counter(TuplesInCounterName) + val tuplesOut = ctx.counter(TuplesOutCounterName) + new SummerBuilder { + override def getSummer[K, V: Semigroup]: AsyncSummer[(K, V), Map[K, V]] = + new com.twitter.algebird.util.summer.NullSummer[K, V](tuplesIn, tuplesOut) + } + } + } + + private case class SyncConstructor( + cacheSize: CacheSize, + flushFrequency: FlushFrequency, + softMemoryFlushPercent: SoftMemoryFlushPercent + ) extends (SummerConstructor.Context => SummerBuilder) { + override def apply(ctx: SummerConstructor.Context): SummerBuilder = { + val memoryCounter = ctx.counter(MemoryCounterName) + val timeoutCounter = ctx.counter(TimeoutCounterName) + val sizeCounter = ctx.counter(SizeCounterName) + val tupleInCounter = ctx.counter(TuplesInCounterName) + val tupleOutCounter = ctx.counter(TuplesOutCounterName) + val insertCounter = ctx.counter(InsertCounterName) + + new SummerBuilder { + def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { + new SyncSummingQueue[K, V]( + BufferSize(cacheSize.lowerBound), + com.twitter.algebird.util.summer.FlushFrequency(flushFrequency.get), + MemoryFlushPercent(softMemoryFlushPercent.get), + memoryCounter, + timeoutCounter, + sizeCounter, + insertCounter, + tupleInCounter, + tupleOutCounter) + } + } + } + } + + private case class AsyncConstructor( + cacheSize: CacheSize, + flushFrequency: FlushFrequency, + softMemoryFlushPercent: SoftMemoryFlushPercent, + asyncPoolSize: AsyncPoolSize, + compactValues: CompactValues, + valueCombinerCacheSize: ValueCombinerCacheSize + ) extends (SummerConstructor.Context => SummerBuilder) { + override def apply(ctx: SummerConstructor.Context): SummerBuilder = { + val memoryCounter = ctx.counter(MemoryCounterName) + val timeoutCounter = ctx.counter(TimeoutCounterName) + val sizeCounter = ctx.counter(SizeCounterName) + val tupleInCounter = ctx.counter(TuplesInCounterName) + val tupleOutCounter = ctx.counter(TuplesOutCounterName) + val insertCounter = ctx.counter(InsertCounterName) + val insertFailCounter = ctx.counter(InsertFailCounterName) + + new SummerBuilder { + def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { + val executor = Executors.newFixedThreadPool(asyncPoolSize.get) + val futurePool = FuturePool(executor) + val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound), + com.twitter.algebird.util.summer.FlushFrequency(flushFrequency.get), + MemoryFlushPercent(softMemoryFlushPercent.get), + memoryCounter, + timeoutCounter, + insertCounter, + insertFailCounter, + sizeCounter, + tupleInCounter, + tupleOutCounter, + futurePool, + Compact(compactValues.toBoolean), + CompactionSize(valueCombinerCacheSize.get)) + summer.withCleanup(() => { + Future { + executor.shutdown + executor.awaitTermination(10, TimeUnit.SECONDS) + } + }) + } + } + } + } +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index 37ef02bb4..9eb0bbbb5 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -16,16 +16,13 @@ package com.twitter.summingbird.storm -import com.twitter.algebird.Semigroup import com.twitter.algebird.util.summer._ +import com.twitter.summingbird.online.OnlineDefaultConstants._ import com.twitter.summingbird.{ Counter, Group, Name } -import com.twitter.summingbird.online.option.{ CompactValues, SummerBuilder, SummerConstructor } -import com.twitter.summingbird.option.JobId +import com.twitter.summingbird.online.option.{ CompactValues, SummerBuilder, SummerConstructor, Summers } import com.twitter.summingbird.storm.planner.StormNode -import com.twitter.util.{ Future, FuturePool } -import java.util.concurrent.{ Executors, TimeUnit } import org.slf4j.LoggerFactory -import Constants._ +import scala.reflect.ClassTag /* * The BuildSummer class is responsible for decoding from the options what SummerBuilder to use when setting up bolts. @@ -33,103 +30,48 @@ import Constants._ * Reading all the options internally. */ private[storm] object BuildSummer { - @transient private val logger = LoggerFactory.getLogger(BuildSummer.getClass) + @transient private[storm] val logger = LoggerFactory.getLogger(BuildSummer.getClass) - def apply(builder: StormTopologyBuilder, node: StormNode) = { - val opSummerConstructor = builder.get[SummerConstructor](node).map(_._2) - logger.debug(s"Node (${builder.getNodeName(node)}): Queried for SummerConstructor, got $opSummerConstructor") - - opSummerConstructor match { - case Some(cons) => - logger.debug(s"Node (${builder.getNodeName(node)}): Using user supplied SummerConstructor: $cons") - cons.get - case None => legacyBuilder(builder, node) + def apply(builder: StormTopologyBuilder, node: StormNode): SummerBuilder = { + val summerConstructor = builder.get[SummerConstructor](node) + .map { case (_, constructor) => constructor }.getOrElse { + logger.debug(s"Node (${builder.getNodeName(node)}): Use legacy way of getting summer constructor.") + legacySummerConstructor(builder, node) } + + logger.debug(s"Node (${builder.getNodeName(node)}): Use $summerConstructor as summer constructor.") + summerConstructor.get.apply(NodeContext(builder, node)) } - private[this] final def legacyBuilder(builder: StormTopologyBuilder, node: StormNode) = { - val nodeName = builder.getNodeName(node) - val cacheSize = builder.getOrElse(node, DEFAULT_FM_CACHE) - val jobId = builder.jobId - require(jobId.get != null, "Unable to register metrics with no job id present in the config updater") - logger.info(s"[$nodeName] cacheSize lowerbound: ${cacheSize.lowerBound}") + private def legacySummerConstructor(builder: StormTopologyBuilder, node: StormNode): SummerConstructor = { + def option[T <: AnyRef: ClassTag](default: T): T = builder.getOrElse[T](node, default) - val memoryCounter = counter(jobId, Group(nodeName), Name("memory")) - val timeoutCounter = counter(jobId, Group(nodeName), Name("timeout")) - val sizeCounter = counter(jobId, Group(nodeName), Name("size")) - val tupleInCounter = counter(jobId, Group(nodeName), Name("tuplesIn")) - val tupleOutCounter = counter(jobId, Group(nodeName), Name("tuplesOut")) - val insertCounter = counter(jobId, Group(nodeName), Name("inserts")) - val insertFailCounter = counter(jobId, Group(nodeName), Name("insertFail")) + val cacheSize = option(DEFAULT_FM_CACHE) if (cacheSize.lowerBound == 0) { - new SummerBuilder { - def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - new com.twitter.algebird.util.summer.NullSummer[K, V](tupleInCounter, tupleOutCounter) - } - } + Summers.Null } else { - val softMemoryFlush = builder.getOrElse(node, DEFAULT_SOFT_MEMORY_FLUSH_PERCENT) - logger.info(s"[$nodeName] softMemoryFlush : ${softMemoryFlush.get}") - - val flushFrequency = builder.getOrElse(node, DEFAULT_FLUSH_FREQUENCY) - logger.info(s"[$nodeName] maxWaiting: ${flushFrequency.get}") - - val useAsyncCache = builder.getOrElse(node, DEFAULT_USE_ASYNC_CACHE) - logger.info(s"[$nodeName] useAsyncCache : ${useAsyncCache.get}") + val softMemoryFlush = option(DEFAULT_SOFT_MEMORY_FLUSH_PERCENT) + val flushFrequency = option(DEFAULT_FLUSH_FREQUENCY) + val useAsyncCache = option(DEFAULT_USE_ASYNC_CACHE) if (!useAsyncCache.get) { - new SummerBuilder { - def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - new SyncSummingQueue[K, V]( - BufferSize(cacheSize.lowerBound), - FlushFrequency(flushFrequency.get), - MemoryFlushPercent(softMemoryFlush.get), - memoryCounter, - timeoutCounter, - sizeCounter, - insertCounter, - tupleInCounter, - tupleOutCounter) - } - } + Summers.sync(cacheSize, flushFrequency, softMemoryFlush) } else { - val asyncPoolSize = builder.getOrElse(node, DEFAULT_ASYNC_POOL_SIZE) - logger.info(s"[$nodeName] asyncPoolSize : ${asyncPoolSize.get}") - - val valueCombinerCrushSize = builder.getOrElse(node, DEFAULT_VALUE_COMBINER_CACHE_SIZE) - logger.info(s"[$nodeName] valueCombinerCrushSize : ${valueCombinerCrushSize.get}") - - val doCompact = builder.getOrElse(node, CompactValues.default) - - new SummerBuilder { - def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - val executor = Executors.newFixedThreadPool(asyncPoolSize.get) - val futurePool = FuturePool(executor) - val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound), - FlushFrequency(flushFrequency.get), - MemoryFlushPercent(softMemoryFlush.get), - memoryCounter, - timeoutCounter, - insertCounter, - insertFailCounter, - sizeCounter, - tupleInCounter, - tupleOutCounter, - futurePool, - Compact(doCompact.toBoolean), - CompactionSize(valueCombinerCrushSize.get)) - summer.withCleanup(() => { - Future { - executor.shutdown - executor.awaitTermination(10, TimeUnit.SECONDS) - } - }) - } - } + val asyncPoolSize = option(DEFAULT_ASYNC_POOL_SIZE) + val valueCombinerCrushSize = option(DEFAULT_VALUE_COMBINER_CACHE_SIZE) + val doCompact = option(CompactValues.default) + Summers.async( + cacheSize, flushFrequency, softMemoryFlush, asyncPoolSize, doCompact, valueCombinerCrushSize + ) } } } - def counter(jobID: JobId, nodeName: Group, counterName: Name) = new Counter(Group("summingbird." + nodeName.getString), counterName)(jobID) with Incrementor + private case class NodeContext(builder: StormTopologyBuilder, node: StormNode) extends SummerConstructor.Context { + override def counter(name: Name): Counter with Incrementor = { + require(builder.jobId.get != null, "Unable to register metrics with no job id present in the config updater") + new Counter(Group("summingbird." + builder.getNodeName(node)), name)(builder.jobId) with Incrementor + } + } } From c6bb6ac1384bb51e9e3ed28e5e10581f61c30bec Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Tue, 27 Jun 2017 00:10:56 -0700 Subject: [PATCH 2/4] Introduce `summerConstructorSpec` to make logging output nicer --- .../summingbird/online/option/AllOpts.scala | 13 ++++-- .../summingbird/online/option/Summers.scala | 44 +++++++++---------- .../summingbird/storm/BuildSummer.scala | 22 ++++------ 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala index f67798ea5..c04d9b3cd 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala @@ -101,13 +101,18 @@ trait SummerBuilder extends Serializable { * The SummerConstructor option, set this instead of CacheSize, AsyncPoolSize, etc.. to provide how to construct the aggregation for this bolt * @see [[Summers]] for useful [[SummerConstructor]]s. */ -case class SummerConstructor(get: SummerConstructor.Context => SummerBuilder) +case class SummerConstructor(get: SummerConstructorSpec) + +trait SummerConstructorSpec { + def builder(counter: Name => Counter with Incrementor): SummerBuilder +} object SummerConstructor { - def apply(get: SummerBuilder): SummerConstructor = SummerConstructor(_ => get) + def apply(get: SummerBuilder): SummerConstructor = + SummerConstructor(DeprecatedSummerConstructorSpec(get)) - trait Context { - def counter(name: Name): Counter with Incrementor + private case class DeprecatedSummerConstructorSpec(get: SummerBuilder) extends SummerConstructorSpec { + override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = get } } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala index 34558e035..2952b29e9 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala @@ -2,7 +2,7 @@ package com.twitter.summingbird.online.option import com.twitter.algebird.Semigroup import com.twitter.algebird.util.summer._ -import com.twitter.summingbird.Name +import com.twitter.summingbird.{ Counter, Name } import com.twitter.summingbird.online.OnlineDefaultConstants._ import com.twitter.summingbird.option.CacheSize import com.twitter.util.{ Future, FuturePool } @@ -36,10 +36,10 @@ object Summers { cacheSize, flushFrequency, softMemoryFlushPercent, asyncPoolSize, compactValues, valueCombinerCacheSize )) - private object NullConstructor extends (SummerConstructor.Context => SummerBuilder) { - override def apply(ctx: SummerConstructor.Context): SummerBuilder = { - val tuplesIn = ctx.counter(TuplesInCounterName) - val tuplesOut = ctx.counter(TuplesOutCounterName) + private case object NullConstructor extends SummerConstructorSpec { + override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = { + val tuplesIn = counter(TuplesInCounterName) + val tuplesOut = counter(TuplesOutCounterName) new SummerBuilder { override def getSummer[K, V: Semigroup]: AsyncSummer[(K, V), Map[K, V]] = new com.twitter.algebird.util.summer.NullSummer[K, V](tuplesIn, tuplesOut) @@ -51,14 +51,14 @@ object Summers { cacheSize: CacheSize, flushFrequency: FlushFrequency, softMemoryFlushPercent: SoftMemoryFlushPercent - ) extends (SummerConstructor.Context => SummerBuilder) { - override def apply(ctx: SummerConstructor.Context): SummerBuilder = { - val memoryCounter = ctx.counter(MemoryCounterName) - val timeoutCounter = ctx.counter(TimeoutCounterName) - val sizeCounter = ctx.counter(SizeCounterName) - val tupleInCounter = ctx.counter(TuplesInCounterName) - val tupleOutCounter = ctx.counter(TuplesOutCounterName) - val insertCounter = ctx.counter(InsertCounterName) + ) extends SummerConstructorSpec { + override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = { + val memoryCounter = counter(MemoryCounterName) + val timeoutCounter = counter(TimeoutCounterName) + val sizeCounter = counter(SizeCounterName) + val tupleInCounter = counter(TuplesInCounterName) + val tupleOutCounter = counter(TuplesOutCounterName) + val insertCounter = counter(InsertCounterName) new SummerBuilder { def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { @@ -84,15 +84,15 @@ object Summers { asyncPoolSize: AsyncPoolSize, compactValues: CompactValues, valueCombinerCacheSize: ValueCombinerCacheSize - ) extends (SummerConstructor.Context => SummerBuilder) { - override def apply(ctx: SummerConstructor.Context): SummerBuilder = { - val memoryCounter = ctx.counter(MemoryCounterName) - val timeoutCounter = ctx.counter(TimeoutCounterName) - val sizeCounter = ctx.counter(SizeCounterName) - val tupleInCounter = ctx.counter(TuplesInCounterName) - val tupleOutCounter = ctx.counter(TuplesOutCounterName) - val insertCounter = ctx.counter(InsertCounterName) - val insertFailCounter = ctx.counter(InsertFailCounterName) + ) extends SummerConstructorSpec { + override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = { + val memoryCounter = counter(MemoryCounterName) + val timeoutCounter = counter(TimeoutCounterName) + val sizeCounter = counter(SizeCounterName) + val tupleInCounter = counter(TuplesInCounterName) + val tupleOutCounter = counter(TuplesOutCounterName) + val insertCounter = counter(InsertCounterName) + val insertFailCounter = counter(InsertFailCounterName) new SummerBuilder { def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index 9eb0bbbb5..83982322d 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -18,7 +18,7 @@ package com.twitter.summingbird.storm import com.twitter.algebird.util.summer._ import com.twitter.summingbird.online.OnlineDefaultConstants._ -import com.twitter.summingbird.{ Counter, Group, Name } +import com.twitter.summingbird.{ Counter, Group } import com.twitter.summingbird.online.option.{ CompactValues, SummerBuilder, SummerConstructor, Summers } import com.twitter.summingbird.storm.planner.StormNode import org.slf4j.LoggerFactory @@ -33,14 +33,17 @@ private[storm] object BuildSummer { @transient private[storm] val logger = LoggerFactory.getLogger(BuildSummer.getClass) def apply(builder: StormTopologyBuilder, node: StormNode): SummerBuilder = { - val summerConstructor = builder.get[SummerConstructor](node) + val summerConstructorSpec = builder.get[SummerConstructor](node) .map { case (_, constructor) => constructor }.getOrElse { - logger.debug(s"Node (${builder.getNodeName(node)}): Use legacy way of getting summer constructor.") + logger.info(s"[${builder.getNodeName(node)}] use legacy way of getting summer constructor") legacySummerConstructor(builder, node) - } + }.get - logger.debug(s"Node (${builder.getNodeName(node)}): Use $summerConstructor as summer constructor.") - summerConstructor.get.apply(NodeContext(builder, node)) + logger.info(s"[${builder.getNodeName(node)}] summer constructor spec: $summerConstructorSpec") + summerConstructorSpec.builder { counterName => + require(builder.jobId.get != null, "Unable to register metrics with no job id present in the config updater") + new Counter(Group("summingbird." + builder.getNodeName(node)), counterName)(builder.jobId) with Incrementor + } } private def legacySummerConstructor(builder: StormTopologyBuilder, node: StormNode): SummerConstructor = { @@ -67,11 +70,4 @@ private[storm] object BuildSummer { } } } - - private case class NodeContext(builder: StormTopologyBuilder, node: StormNode) extends SummerConstructor.Context { - override def counter(name: Name): Counter with Incrementor = { - require(builder.jobId.get != null, "Unable to register metrics with no job id present in the config updater") - new Counter(Group("summingbird." + builder.getNodeName(node)), name)(builder.jobId) with Incrementor - } - } } From 1f8a74452df0da9339a2ede7b9788c3e6654fa71 Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Tue, 27 Jun 2017 13:44:06 -0700 Subject: [PATCH 3/4] SummerConstructorSpec -> SummerWithCountersBuilder --- .../summingbird/online/option/AllOpts.scala | 12 ++--- .../summingbird/online/option/Summers.scala | 53 ++++++------------- .../summingbird/storm/BuildSummer.scala | 22 ++++---- 3 files changed, 34 insertions(+), 53 deletions(-) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala index c04d9b3cd..ec14e5511 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala @@ -99,20 +99,20 @@ trait SummerBuilder extends Serializable { /** * The SummerConstructor option, set this instead of CacheSize, AsyncPoolSize, etc.. to provide how to construct the aggregation for this bolt - * @see [[Summers]] for useful [[SummerConstructor]]s. + * @see [[Summers]] for useful [[SummerWithCountersBuilder]]s. */ -case class SummerConstructor(get: SummerConstructorSpec) +case class SummerConstructor(get: SummerWithCountersBuilder) -trait SummerConstructorSpec { - def builder(counter: Name => Counter with Incrementor): SummerBuilder +trait SummerWithCountersBuilder { + def create(counter: Name => Counter with Incrementor): SummerBuilder } object SummerConstructor { def apply(get: SummerBuilder): SummerConstructor = SummerConstructor(DeprecatedSummerConstructorSpec(get)) - private case class DeprecatedSummerConstructorSpec(get: SummerBuilder) extends SummerConstructorSpec { - override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = get + private case class DeprecatedSummerConstructorSpec(get: SummerBuilder) extends SummerWithCountersBuilder { + override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = get } } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala index 2952b29e9..26481fed3 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala @@ -17,27 +17,8 @@ object Summers { val InsertCounterName = Name("inserts") val InsertFailCounterName = Name("insertFail") - val Null = new SummerConstructor(NullConstructor) - - def sync( - cacheSize: CacheSize = DEFAULT_FM_CACHE, - flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, - softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT - ): SummerConstructor = new SummerConstructor(SyncConstructor(cacheSize, flushFrequency, softMemoryFlushPercent)) - - def async( - cacheSize: CacheSize = DEFAULT_FM_CACHE, - flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, - softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT, - asyncPoolSize: AsyncPoolSize = DEFAULT_ASYNC_POOL_SIZE, - compactValues: CompactValues = CompactValues.default, - valueCombinerCacheSize: ValueCombinerCacheSize = DEFAULT_VALUE_COMBINER_CACHE_SIZE - ): SummerConstructor = new SummerConstructor(AsyncConstructor( - cacheSize, flushFrequency, softMemoryFlushPercent, asyncPoolSize, compactValues, valueCombinerCacheSize - )) - - private case object NullConstructor extends SummerConstructorSpec { - override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = { + case object Null extends SummerWithCountersBuilder { + override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = { val tuplesIn = counter(TuplesInCounterName) val tuplesOut = counter(TuplesOutCounterName) new SummerBuilder { @@ -47,12 +28,12 @@ object Summers { } } - private case class SyncConstructor( - cacheSize: CacheSize, - flushFrequency: FlushFrequency, - softMemoryFlushPercent: SoftMemoryFlushPercent - ) extends SummerConstructorSpec { - override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = { + case class Sync( + cacheSize: CacheSize = DEFAULT_FM_CACHE, + flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, + softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT + ) extends SummerWithCountersBuilder { + override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = { val memoryCounter = counter(MemoryCounterName) val timeoutCounter = counter(TimeoutCounterName) val sizeCounter = counter(SizeCounterName) @@ -77,15 +58,15 @@ object Summers { } } - private case class AsyncConstructor( - cacheSize: CacheSize, - flushFrequency: FlushFrequency, - softMemoryFlushPercent: SoftMemoryFlushPercent, - asyncPoolSize: AsyncPoolSize, - compactValues: CompactValues, - valueCombinerCacheSize: ValueCombinerCacheSize - ) extends SummerConstructorSpec { - override def builder(counter: (Name) => Counter with Incrementor): SummerBuilder = { + case class Async( + cacheSize: CacheSize = DEFAULT_FM_CACHE, + flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, + softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT, + asyncPoolSize: AsyncPoolSize = DEFAULT_ASYNC_POOL_SIZE, + compactValues: CompactValues = CompactValues.default, + valueCombinerCacheSize: ValueCombinerCacheSize = DEFAULT_VALUE_COMBINER_CACHE_SIZE + ) extends SummerWithCountersBuilder { + override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = { val memoryCounter = counter(MemoryCounterName) val timeoutCounter = counter(TimeoutCounterName) val sizeCounter = counter(SizeCounterName) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index 83982322d..f8f7ae8ec 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -19,7 +19,7 @@ package com.twitter.summingbird.storm import com.twitter.algebird.util.summer._ import com.twitter.summingbird.online.OnlineDefaultConstants._ import com.twitter.summingbird.{ Counter, Group } -import com.twitter.summingbird.online.option.{ CompactValues, SummerBuilder, SummerConstructor, Summers } +import com.twitter.summingbird.online.option._ import com.twitter.summingbird.storm.planner.StormNode import org.slf4j.LoggerFactory import scala.reflect.ClassTag @@ -33,20 +33,20 @@ private[storm] object BuildSummer { @transient private[storm] val logger = LoggerFactory.getLogger(BuildSummer.getClass) def apply(builder: StormTopologyBuilder, node: StormNode): SummerBuilder = { - val summerConstructorSpec = builder.get[SummerConstructor](node) - .map { case (_, constructor) => constructor }.getOrElse { - logger.info(s"[${builder.getNodeName(node)}] use legacy way of getting summer constructor") - legacySummerConstructor(builder, node) - }.get + val summerBuilder = builder.get[SummerConstructor](node) + .map { case (_, constructor) => constructor.get }.getOrElse { + logger.info(s"[${builder.getNodeName(node)}] use legacy way of getting summer builder") + legacySummerBuilder(builder, node) + } - logger.info(s"[${builder.getNodeName(node)}] summer constructor spec: $summerConstructorSpec") - summerConstructorSpec.builder { counterName => + logger.info(s"[${builder.getNodeName(node)}] summer builder: $summerBuilder") + summerBuilder.create { counterName => require(builder.jobId.get != null, "Unable to register metrics with no job id present in the config updater") new Counter(Group("summingbird." + builder.getNodeName(node)), counterName)(builder.jobId) with Incrementor } } - private def legacySummerConstructor(builder: StormTopologyBuilder, node: StormNode): SummerConstructor = { + private def legacySummerBuilder(builder: StormTopologyBuilder, node: StormNode): SummerWithCountersBuilder = { def option[T <: AnyRef: ClassTag](default: T): T = builder.getOrElse[T](node, default) val cacheSize = option(DEFAULT_FM_CACHE) @@ -59,12 +59,12 @@ private[storm] object BuildSummer { val useAsyncCache = option(DEFAULT_USE_ASYNC_CACHE) if (!useAsyncCache.get) { - Summers.sync(cacheSize, flushFrequency, softMemoryFlush) + Summers.Sync(cacheSize, flushFrequency, softMemoryFlush) } else { val asyncPoolSize = option(DEFAULT_ASYNC_POOL_SIZE) val valueCombinerCrushSize = option(DEFAULT_VALUE_COMBINER_CACHE_SIZE) val doCompact = option(CompactValues.default) - Summers.async( + Summers.Async( cacheSize, flushFrequency, softMemoryFlush, asyncPoolSize, doCompact, valueCombinerCrushSize ) } From 22532c0dece3d9c7bd95c7ecf77b73e9b398f894 Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Wed, 28 Jun 2017 19:46:07 -0700 Subject: [PATCH 4/4] Pankaj's and Oscar's comments --- .../summingbird/online/option/AllOpts.scala | 8 ++++++-- .../summingbird/online/option/Summers.scala | 8 ++++---- .../summingbird/storm/BuildSummer.scala | 18 ++++++++++++------ 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala index ec14e5511..a490407b8 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/AllOpts.scala @@ -103,8 +103,12 @@ trait SummerBuilder extends Serializable { */ case class SummerConstructor(get: SummerWithCountersBuilder) +/** + * Returned [[SummerBuilder]] should be [[Serializable]], while [[SummerWithCountersBuilder]] + * should be used only on submitter node. + */ trait SummerWithCountersBuilder { - def create(counter: Name => Counter with Incrementor): SummerBuilder + def create(counter: Name => Incrementor): SummerBuilder } object SummerConstructor { @@ -112,7 +116,7 @@ object SummerConstructor { SummerConstructor(DeprecatedSummerConstructorSpec(get)) private case class DeprecatedSummerConstructorSpec(get: SummerBuilder) extends SummerWithCountersBuilder { - override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = get + override def create(counter: (Name) => Incrementor): SummerBuilder = get } } diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala index 26481fed3..54f2453b8 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/option/Summers.scala @@ -2,7 +2,7 @@ package com.twitter.summingbird.online.option import com.twitter.algebird.Semigroup import com.twitter.algebird.util.summer._ -import com.twitter.summingbird.{ Counter, Name } +import com.twitter.summingbird.Name import com.twitter.summingbird.online.OnlineDefaultConstants._ import com.twitter.summingbird.option.CacheSize import com.twitter.util.{ Future, FuturePool } @@ -18,7 +18,7 @@ object Summers { val InsertFailCounterName = Name("insertFail") case object Null extends SummerWithCountersBuilder { - override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = { + override def create(counter: (Name) => Incrementor): SummerBuilder = { val tuplesIn = counter(TuplesInCounterName) val tuplesOut = counter(TuplesOutCounterName) new SummerBuilder { @@ -33,7 +33,7 @@ object Summers { flushFrequency: FlushFrequency = DEFAULT_FLUSH_FREQUENCY, softMemoryFlushPercent: SoftMemoryFlushPercent = DEFAULT_SOFT_MEMORY_FLUSH_PERCENT ) extends SummerWithCountersBuilder { - override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = { + override def create(counter: (Name) => Incrementor): SummerBuilder = { val memoryCounter = counter(MemoryCounterName) val timeoutCounter = counter(TimeoutCounterName) val sizeCounter = counter(SizeCounterName) @@ -66,7 +66,7 @@ object Summers { compactValues: CompactValues = CompactValues.default, valueCombinerCacheSize: ValueCombinerCacheSize = DEFAULT_VALUE_COMBINER_CACHE_SIZE ) extends SummerWithCountersBuilder { - override def create(counter: (Name) => Counter with Incrementor): SummerBuilder = { + override def create(counter: (Name) => Incrementor): SummerBuilder = { val memoryCounter = counter(MemoryCounterName) val timeoutCounter = counter(TimeoutCounterName) val sizeCounter = counter(SizeCounterName) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index f8f7ae8ec..f9def39c4 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -16,7 +16,7 @@ package com.twitter.summingbird.storm -import com.twitter.algebird.util.summer._ +import com.twitter.algebird.util.summer.Incrementor import com.twitter.summingbird.online.OnlineDefaultConstants._ import com.twitter.summingbird.{ Counter, Group } import com.twitter.summingbird.online.option._ @@ -34,10 +34,11 @@ private[storm] object BuildSummer { def apply(builder: StormTopologyBuilder, node: StormNode): SummerBuilder = { val summerBuilder = builder.get[SummerConstructor](node) - .map { case (_, constructor) => constructor.get }.getOrElse { - logger.info(s"[${builder.getNodeName(node)}] use legacy way of getting summer builder") - legacySummerBuilder(builder, node) - } + .map { case (_, constructor) => constructor.get } + .getOrElse { + logger.info(s"[${builder.getNodeName(node)}] use legacy way of getting summer builder") + legacySummerBuilder(builder, node) + } logger.info(s"[${builder.getNodeName(node)}] summer builder: $summerBuilder") summerBuilder.create { counterName => @@ -65,7 +66,12 @@ private[storm] object BuildSummer { val valueCombinerCrushSize = option(DEFAULT_VALUE_COMBINER_CACHE_SIZE) val doCompact = option(CompactValues.default) Summers.Async( - cacheSize, flushFrequency, softMemoryFlush, asyncPoolSize, doCompact, valueCombinerCrushSize + cacheSize, + flushFrequency, + softMemoryFlush, + asyncPoolSize, + doCompact, + valueCombinerCrushSize ) } }