Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Track state in summingbird-online as an Iterator rather than a Seq. #703

Merged
merged 8 commits into from
Jan 4, 2017

Conversation

pankajroark
Copy link
Contributor

Fix for #689 . This should avoid n^2 compute complexity when summing single element lists of Storm tuples.

…his should avoid n^2 compute comlexity when summing single element lists of Storm tuples.
@johnynek
Copy link
Collaborator

could we use Stream instead? the mutability of an Iterator makes it pretty costly to verify that it is not buggy with a code review. I really hate to have a mutable object on an API unless the performance is really necessitating it?

@pankajroark
Copy link
Contributor Author

Good idea. Changed to use stream instead of iterator.

@pankajroark
Copy link
Contributor Author

tbh Iterator is 4x faster than stream in this microbenchmark but both are way faster than List, so stream seems fine:
val s = (0 to 10000).toList

@benchmark
def listConcat(): List[Int] = {
s ++ List(0)
}

@benchmark
def streamConcat(): Stream[Int] = {
s.toStream ++ Stream(0)
}

@benchmark
def iterConcat(): Iterator[Int] = {
s.toIterator ++ Iterator.single(0)
}

Results:
[info] ToBenchmark.iterConcat thrpt 4 85182247.256 ± 5840222.962 ops/s
[info] ToBenchmark.listConcat thrpt 4 9937.918 ± 26498.037 ops/s
[info] ToBenchmark.streamConcat thrpt 4 22019587.193 ± 7199199.008 ops/s

@johnynek
Copy link
Collaborator

johnynek commented Dec 25, 2016 via email

@pankajroark
Copy link
Contributor Author

I tried Batched but the code started getting complicated because of https://github.com/twitter/summingbird/blob/develop/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala#L46

We need to be able to pass an empty Batched here. Batched itself doesn't have a zero value it relies on the contained type. So we either have to use Option[Batched] which wouldn't be good for performance, always having to wrap/unwrap. Or we have to have the monoid of S available in this abstract class, which would require further code changes.

@pankajroark
Copy link
Contributor Author

Just for comparison iterator and Batched are indeed comparable in perf. I'd really like to use Batched if we could find a simple way or iterator otherwise. Combining input state is in the hot path.:
val s = (0 to 10000).toList
val bs = Batched.items(s).get
val is = s.toIterator
val ss = s.toStream

@benchmark
def listConcat(): List[Int] = {
s ++ List(0)
}

@benchmark
def streamConcat(): Stream[Int] = {
ss ++ Stream(0)
}

@benchmark
def iterConcat(): Iterator[Int] = {
is ++ Iterator.single(0)
}

@benchmark
def batchedConcat(): Batched[Int] = {
bs.combine(Batched(0))
}

[info] ToBenchmark.batchedConcat thrpt 4 126300963.122 ± 46804561.489 ops/s
[info] ToBenchmark.iterConcat thrpt 4 121149476.337 ± 46339829.598 ops/s
[info] ToBenchmark.listConcat thrpt 4 9380.640 ± 23140.126 ops/s
[info] ToBenchmark.streamConcat thrpt 4 24118157.945 ± 7172767.691 ops/s

@pankajroark
Copy link
Contributor Author

Any suggestions on the next steps here. I'm ok with Stream, it's still much better than List in this case.

@johnynek
Copy link
Collaborator

johnynek commented Jan 2, 2017

I have not investigated why the tests are red.

If we can make them green with something faster. I'd be happy to do that.

I don't think using Batched would be too much work. Or we could copy this code or add this dependency:

https://github.com/non/chain/blob/master/src/main/scala/chain/Chain.scala

It is a single file library that is the more general version of Batched (it has empty), it was written by @non who also wrote Batched.

Do we really need an empty Batched? I would imagine that a Monoid[Option[Batched[T]]] would be almost as fast (still faster than Stream).

It is up to you. I think killing the O(N^2) is most important. Losing a constant factor of 4 is probably not a huge deal if you don't want to work on this other stuff.

Not using a mutable data structure is pretty important to me since this code has been worked on by many people now, and it is much easier to make a mistake with mutable APIs.

@pankajroark
Copy link
Contributor Author

pankajroark commented Jan 2, 2017 via email

@johnynek
Copy link
Collaborator

johnynek commented Jan 2, 2017 via email

@pankajroark
Copy link
Contributor Author

pankajroark commented Jan 2, 2017 via email

@pankajroark
Copy link
Contributor Author

Chain seems good:
[info] ToBenchmark.batchedConcat thrpt 4 133738359.414 ± 3281958.077 ops/s
[info] ToBenchmark.chainConcat thrpt 4 90914620.450 ± 19299667.614 ops/s
[info] ToBenchmark.iterConcat thrpt 4 122448980.991 ± 38379171.178 ops/s
[info] ToBenchmark.streamConcat thrpt 4 26017386.004 ± 6404231.437 ops/s

I've updated the review with now using chain.

@pankajroark
Copy link
Contributor Author

Tests pass now, just waiting for shipit to merge.

@@ -149,12 +150,13 @@ case class BaseBolt[I, O](jobID: JobId,
}
}

private def finish(inputs: Stream[InputState[Tuple]], results: TraversableOnce[O]) {
private def finish(inputs: Chain[InputState[Tuple]], results: TraversableOnce[O]) {
val tuples = inputs.iterator.map(_.state).toList
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we move this line to 158 (only materialize the List if we have dependants and we anchor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning was that we need to iterate through the chain anyway to get the size for the log statement at the end of function but I agree, materialization of list should be avoided as well in those cases. Proposed a fix.

var emitCount = 0
if (hasDependants) {
if (anchorTuples.anchor) {
results.foreach { result =>
val tuples = inputs.iterator.map(_.state).toList
numTuples = Some(tuples.size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list .size is O(N) not O(1). So if we are going to do this, why use the var and not just use use inputs.iterator.size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We should be able to calculate the size and list in one pass though. Let me try.

var emitCount = 0
if (hasDependants) {
if (anchorTuples.anchor) {
results.foreach { result =>
val tuples = inputs.iterator.map(_.state).toList
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want this above the foreach? We don't want to recompute tuples for each result. We just want it once and then reuse for all results, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea. I believe this was how it was in original code too so I didn't notice. Let me try to fix this.

@codecov-io
Copy link

codecov-io commented Jan 4, 2017

Current coverage is 70.96% (diff: 80.95%)

No coverage report found for develop at 9a80b22.

Powered by Codecov. Last update 9a80b22...a90c9da

@johnynek
Copy link
Collaborator

johnynek commented Jan 4, 2017

👍

@pankajroark pankajroark merged commit 77b65d5 into develop Jan 4, 2017
@ttim
Copy link
Collaborator

ttim commented Jan 4, 2017

What is a reason to expose Chain on a level of OperationContainer ? I'm not strictly against that but Traversable or even TraversableOnce seems more appropriate for me.

To be more precise - AsyncSummer instances should be over Chain to avoid N^2 complexity while everything else should be over Traversable/TraversableOnce. What do you think?

@johnynek
Copy link
Collaborator

johnynek commented Jan 4, 2017

I think making a follow up PR that minimized the scope of visibility of Chain (maybe even walking back some of the mima exclusions) would be fine. But if we have to copy to do it, I would not.

For instance, you can go: Iterable[T] => Chain[T] but not Iterator[T] => Chain[T] without a copy.

So, maybe we could use Iterable[T] in some cases, and internally use a Chain[T] to do fast concat.

That said, these are fairly "private" classes in that 99% of summingbird users would never use them. In fact, likely only storm/heron platform would use them.

@johnynek johnynek deleted the pg/inputstate_seq_opt branch January 4, 2017 17:42
@ttim
Copy link
Collaborator

ttim commented Jan 4, 2017

I agree.

Regarding to copying - we don't need Iterator[T] => Chain[T] transformation because OperationContainer#execute accepts single state element. But we need Chain[T] => Iterable[T] transformation which is the same (in terms of copying) as Chain[T] => Iterator[T].

@pankajroark
Copy link
Contributor Author

I agree exposing chain at OperationContainer is not ideal and also that OperationContainer is used only in storm/heron platform right now, so I feel it's better to keep it simple, as is, for now. Let me create an issue to capture this though.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants