Skip to content

Commit

Permalink
Merge pull request #1243 from benjchristensen/perf
Browse files Browse the repository at this point in the history
Remove Subscription Wrapper from Observable.subscribe
  • Loading branch information
benjchristensen committed May 22, 2014
2 parents 4ae5333 + 0db09c3 commit a2f4782
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 10 deletions.
11 changes: 1 addition & 10 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6255,16 +6255,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
subscriber = new SafeSubscriber<T>(subscriber);
}
onSubscribeFunction.call(subscriber);
final Subscription returnSubscription = hook.onSubscribeReturn(subscriber);
// we return it inside a Subscription so it can't be cast back to Subscriber
return Subscriptions.create(new Action0() {

@Override
public void call() {
returnSubscription.unsubscribe();
}

});
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
Expand Down
30 changes: 30 additions & 0 deletions rxjava-core/src/perf/java/rx/usecases/PerfObserveOn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.usecases;

import org.openjdk.jmh.annotations.GenerateMicroBenchmark;

import rx.schedulers.Schedulers;

public class PerfObserveOn {

@GenerateMicroBenchmark
public void observeOn(UseCaseInput input) throws InterruptedException {
input.observable.observeOn(Schedulers.computation()).subscribe(input.observer);
input.awaitCompletion();
}

}
71 changes: 71 additions & 0 deletions rxjava-core/src/perf/java/rx/usecases/PerfTransforms.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.usecases;

import org.openjdk.jmh.annotations.GenerateMicroBenchmark;

import rx.Observable;
import rx.functions.Func1;

public class PerfTransforms {

@GenerateMicroBenchmark
public void mapTransformation(UseCaseInput input) throws InterruptedException {
input.observable.map(new Func1<Integer, String>() {

@Override
public String call(Integer i) {
return String.valueOf(i);
}

}).map(new Func1<String, Integer>() {

@Override
public Integer call(String i) {
return Integer.parseInt(i);
}

}).subscribe(input.observer);
input.awaitCompletion();
}

@GenerateMicroBenchmark
public void flatMapTransformsUsingFrom(UseCaseInput input) throws InterruptedException {
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {

@Override
public Observable<Integer> call(Integer i) {
return Observable.from(i);
}

}).subscribe(input.observer);
input.awaitCompletion();
}

@GenerateMicroBenchmark
public void flatMapTransformsUsingJust(UseCaseInput input) throws InterruptedException {
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {

@Override
public Observable<Integer> call(Integer i) {
return Observable.just(i);
}

}).subscribe(input.observer);
input.awaitCompletion();
}

}
81 changes: 81 additions & 0 deletions rxjava-core/src/perf/java/rx/usecases/UseCaseInput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.usecases;

import java.util.concurrent.CountDownLatch;

import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.logic.BlackHole;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;

/**
* Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole.
*/
@State(Scope.Thread)
public class UseCaseInput {
@Param({ "1", "1024" })
public int size;

public Observable<Integer> observable;
public Observer<Integer> observer;

private CountDownLatch latch;

@Setup
public void setup() {
observable = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> o) {
for (int value = 0; value < size; value++) {
o.onNext(value);
}
o.onCompleted();
}
});

final BlackHole bh = new BlackHole();
latch = new CountDownLatch(1);

observer = new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}

@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}

@Override
public void onNext(Integer value) {
bh.consume(value);
}
};

}

public void awaitCompletion() throws InterruptedException {
latch.await();
}
}

0 comments on commit a2f4782

Please sign in to comment.