Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler correctness improvements. #1158

Merged
merged 2 commits into from
May 6, 2014

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented May 6, 2014

Second round on the scheduler correctness issue.

Sure it looks more heavy as ScheduledAction now has its own inner CompositeSubscription and a shared reference to the parent innerSubscription.

I've tried to benchmark it with SchedulerPerformanceTests but that test is flawed:

  • Multiple threads pound on the same long sum value, so naturally additions get lost.
  • The LongObserver gets unsubscribed after the first loop so from will not actually call onNext but isUnsubscribed a lot.
  • Does not wait for the computations to finish and basically measures how fast 5M tasks can be added to the NewThreadScheduler's innerSubscription.

The flawed test gives ~ 11M ops/sec on my machine. If I fix the test and run against the master, 5M takes extremely long to finish due to the inherent slowness of add/remove in CompositeSubscription if large. On my 4 core hyperthread enabled machine, I get 50k-150k ops/second for baseline with a 100k loop.

The proposed changes run with the flawed test gives ~10.8M ops/sec. With the test fixed and with the proposed changes, I get ~1.2M ops/sec.

@cloudbees-pull-request-builder

RxJava-pull-requests #1070 FAILURE
Looks like there's a problem with this pull request

@daschl
Copy link
Contributor

daschl commented May 6, 2014

@akarnokd do you need help porting it over to JMH? I can assist with that.

@akarnokd
Copy link
Member Author

akarnokd commented May 6, 2014

That would be great, but I don't know how to run a JHM benchmark from NetBeans.

@daschl
Copy link
Contributor

daschl commented May 6, 2014

@akarnokd ideally you run it from the command line as a encapsulated package.

Once you write the benchmark, you package it up as a fat jar and then you can execute your test only.. ala: ../gradlew benchmarks '-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .OperatorSerializePerf.'

but just replace the name of the benchmark with yours, otherwise it will run this one or all if you leave it out.

Note that with some fiddling around, I did this for netty, you can fake it from your IDE by either writing a custom Junit runner or just doing it like this: https://github.com/netty/netty/blob/master/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java

@jbripley
Copy link
Contributor

jbripley commented May 6, 2014

Looks like OnActionComplete is no longer used and could be removed?

@akarnokd
Copy link
Member Author

akarnokd commented May 6, 2014

True. I'm working on something else right now so I come back later to remove it.

@cloudbees-pull-request-builder

RxJava-pull-requests #1074 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

I don't know how to run a JHM benchmark from NetBeans.

I always run them from command line. See comments on how to run here: https://github.com/Netflix/RxJava/blob/master/build.gradle#L92

@benjchristensen
Copy link
Member

you package it up as a fat jar

This option also exists with the shadow task that needs to be renamed to something like benchmarkJar: https://github.com/Netflix/RxJava/blob/master/build.gradle#L132

@akarnokd
Copy link
Member Author

akarnokd commented May 6, 2014

@benjchristensen
Copy link
Member

I added performance tests for observeOn and subscribeOn using Schedulers.computation(): https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/perf/java/rx/schedulers/ComputationSchedulerPerf.java

This pull request is far faster on startup (when subscribing to an Observable of 1 item) and somewhat faster on long-running Observables.

../gradlew benchmarks '-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*ComputationSchedulerPerf.*'

New Code

Result : 235340.831 ±(99.9%) 490947.687 ns/op
  Statistics: (min, avg, max) = (90922.636, 235340.831, 420715.962), stdev = 127497.555
  Confidence interval (99.9%): [-255606.856, 726288.517]


Benchmark                                   (size)   Mode   Samples         Mean   Mean error    Units
r.s.ComputationSchedulerPerf.observeOn           1   avgt         5     2180.371      207.253    ns/op
r.s.ComputationSchedulerPerf.observeOn        1024   avgt         5   128238.555    17976.667    ns/op
r.s.ComputationSchedulerPerf.observeOn     1048576   avgt         5 149856485.714 19627546.003    ns/op
r.s.ComputationSchedulerPerf.subscribeOn         1   avgt         5     2197.782      267.688    ns/op
r.s.ComputationSchedulerPerf.subscribeOn      1024   avgt         5   414051.806  1536527.849    ns/op
r.s.ComputationSchedulerPerf.subscribeOn   1048576   avgt         5   235340.831   490947.687    ns/op
Result : 1509019.668 ±(99.9%) 4443411.014 ns/op
  Statistics: (min, avg, max) = (300644.144, 1509019.668, 2913322.987), stdev = 1153939.730
  Confidence interval (99.9%): [-2934391.346, 5952430.682]


Benchmark                                   (size)   Mode   Samples         Mean   Mean error    Units
r.s.ComputationSchedulerPerf.observeOn           1   avgt         5     2056.254      554.314    ns/op
r.s.ComputationSchedulerPerf.observeOn        1024   avgt         5   101872.671    12964.522    ns/op
r.s.ComputationSchedulerPerf.observeOn     1048576   avgt         5 147002257.143  9201133.563    ns/op
r.s.ComputationSchedulerPerf.subscribeOn         1   avgt         5     2235.005      336.381    ns/op
r.s.ComputationSchedulerPerf.subscribeOn      1024   avgt         5   550600.029   482135.695    ns/op
r.s.ComputationSchedulerPerf.subscribeOn   1048576   avgt         5  1509019.668  4443411.014    ns/op

Master

Result : 63507.629 ±(99.9%) 131757.785 ns/op
  Statistics: (min, avg, max) = (38513.407, 63507.629, 123730.129), stdev = 34217.078
  Confidence interval (99.9%): [-68250.156, 195265.414]


Benchmark                                   (size)   Mode   Samples         Mean   Mean error    Units
r.s.ComputationSchedulerPerf.observeOn           1   avgt         5    51867.073    77712.741    ns/op
r.s.ComputationSchedulerPerf.observeOn        1024   avgt         5   118883.112    14487.931    ns/op
r.s.ComputationSchedulerPerf.observeOn     1048576   avgt         5 154828171.429 20611654.844    ns/op
r.s.ComputationSchedulerPerf.subscribeOn         1   avgt         5    68898.012   200669.541    ns/op
r.s.ComputationSchedulerPerf.subscribeOn      1024   avgt         5    71403.833   209712.445    ns/op
r.s.ComputationSchedulerPerf.subscribeOn   1048576   avgt         5    63507.629   131757.785    ns/op
Result : 183126.150 ±(99.9%) 1168534.854 ns/op
  Statistics: (min, avg, max) = (37927.424, 183126.150, 725869.905), stdev = 303464.791
  Confidence interval (99.9%): [-985408.704, 1351661.004]


Benchmark                                   (size)   Mode   Samples         Mean   Mean error    Units
r.s.ComputationSchedulerPerf.observeOn           1   avgt         5    50085.795    64485.544    ns/op
r.s.ComputationSchedulerPerf.observeOn        1024   avgt         5   123673.009     3130.488    ns/op
r.s.ComputationSchedulerPerf.observeOn     1048576   avgt         5 151220000.000 20063502.154    ns/op
r.s.ComputationSchedulerPerf.subscribeOn         1   avgt         5    73814.774   202116.247    ns/op
r.s.ComputationSchedulerPerf.subscribeOn      1024   avgt         5    72216.122   186483.284    ns/op
r.s.ComputationSchedulerPerf.subscribeOn   1048576   avgt         5   183126.150  1168534.854    ns/op

@benjchristensen
Copy link
Member

One thing I don't like about how this is currently working is that we have to create the ScheduledAction each time, and that feels wasteful when we immediately are putting it inside a queue inside PoolWorker. It feels like we should be able to unsubscribe the queue and not need to maintain the CompositeSubscription of ScheduledAction instances.

This may fit with what @spodila is working on as I believe it will maintain a queue per Worker and the underlying threads will pull off the queues. In other words, by eliminating the use of Executor we get better control of the thread and can get rid of a layer of abstraction.

Going to merge this however as it is better than the current implementation and we'll continue improving this in #1149

benjchristensen added a commit that referenced this pull request May 6, 2014
Scheduler correctness improvements.
@benjchristensen benjchristensen merged commit 0302369 into ReactiveX:master May 6, 2014
@akarnokd akarnokd deleted the SchedulerFixes0506 branch May 6, 2014 20:09
@akarnokd
Copy link
Member Author

akarnokd commented May 9, 2014

Ooops, I forgot to mention there might be some unwanted task executions after NewThreadScheduler is unsubscribed because executor.shutdown() does not remove pending tasks, executor.shutdownNow() does it.

@akarnokd
Copy link
Member Author

akarnokd commented May 9, 2014

EventLoopScheduler is not affected because its worker keeps track of the tasks and cancels them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants