From c0c2e8b77e4b01954fcfd712fdefd90e25b387e5 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 10 Mar 2014 16:42:23 -0700 Subject: [PATCH] Make OperatorObserveOnTest.testNonBlockingOuterWhileBlockingOnNext deterministic reported at https://twitter.com/jaceklaskowski/status/443153927069249536 --- .../rx/operators/OperatorObserveOnTest.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 569aee115a..037060298b 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -309,16 +309,17 @@ public void call(Integer t1) { @Test public void testNonBlockingOuterWhileBlockingOnNext() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch completedLatch = new CountDownLatch(1); + final CountDownLatch nextLatch = new CountDownLatch(1); final AtomicLong completeTime = new AtomicLong(); // use subscribeOn to make async, observeOn to move - Observable.range(1, 1000).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Observer() { + Observable.range(1, 2).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Observer() { @Override public void onCompleted() { System.out.println("onCompleted"); completeTime.set(System.nanoTime()); - latch.countDown(); + completedLatch.countDown(); } @Override @@ -328,20 +329,27 @@ public void onError(Throwable e) { @Override public void onNext(Integer t) { - + // don't let this thing finish yet + try { + if (!nextLatch.await(1000, TimeUnit.MILLISECONDS)) { + throw new RuntimeException("it shouldn't have timed out"); + } + } catch (InterruptedException e) { + throw new RuntimeException("it shouldn't have failed"); + } } }); long afterSubscribeTime = System.nanoTime(); - System.out.println("After subscribe: " + latch.getCount()); - assertEquals(1, latch.getCount()); - latch.await(); + System.out.println("After subscribe: " + completedLatch.getCount()); + assertEquals(1, completedLatch.getCount()); + nextLatch.countDown(); + completedLatch.await(1000, TimeUnit.MILLISECONDS); assertTrue(completeTime.get() > afterSubscribeTime); System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime)); } - private static int randomIntFrom0to100() { // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml long x = System.nanoTime();