Skip to content

Commit

Permalink
Make OperatorObserveOnTest.testNonBlockingOuterWhileBlockingOnNext de…
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Mar 10, 2014
1 parent a870624 commit c0c2e8b
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,16 +309,17 @@ public void call(Integer t1) {
@Test
public void testNonBlockingOuterWhileBlockingOnNext() throws InterruptedException {

final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch completedLatch = new CountDownLatch(1);
final CountDownLatch nextLatch = new CountDownLatch(1);
final AtomicLong completeTime = new AtomicLong();
// use subscribeOn to make async, observeOn to move
Observable.range(1, 1000).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Observer<Integer>() {
Observable.range(1, 2).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Observer<Integer>() {

@Override
public void onCompleted() {
System.out.println("onCompleted");
completeTime.set(System.nanoTime());
latch.countDown();
completedLatch.countDown();
}

@Override
Expand All @@ -328,20 +329,27 @@ public void onError(Throwable e) {

@Override
public void onNext(Integer t) {

// don't let this thing finish yet
try {
if (!nextLatch.await(1000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("it shouldn't have timed out");
}
} catch (InterruptedException e) {
throw new RuntimeException("it shouldn't have failed");
}
}

});

long afterSubscribeTime = System.nanoTime();
System.out.println("After subscribe: " + latch.getCount());
assertEquals(1, latch.getCount());
latch.await();
System.out.println("After subscribe: " + completedLatch.getCount());
assertEquals(1, completedLatch.getCount());
nextLatch.countDown();
completedLatch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(completeTime.get() > afterSubscribeTime);
System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime));
}


private static int randomIntFrom0to100() {
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
long x = System.nanoTime();
Expand Down

0 comments on commit c0c2e8b

Please sign in to comment.