From 89bb9dbdf7e73c8238dc4a92c8281e8ca3a5ec53 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 6 Feb 2014 15:13:59 +0800 Subject: [PATCH 1/3] Reimplement 'subscribeOn' using 'lift' --- rxjava-core/src/main/java/rx/Observable.java | 4 +- .../rx/operators/OperationSubscribeOn.java | 97 ---------- .../rx/operators/OperatorSubscribeOn.java | 105 +++++++++++ .../operators/OperationSubscribeOnTest.java | 54 ------ .../rx/operators/OperatorSubscribeOnTest.java | 173 ++++++++++++++++++ 5 files changed, 280 insertions(+), 153 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java delete mode 100644 rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 50fdfdc338..393b7e5d86 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -80,7 +80,6 @@ import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; import rx.operators.OperationSkipWhile; -import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; @@ -97,6 +96,7 @@ import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; +import rx.operators.OperatorSubscribeOn; import rx.operators.OperatorZip; import rx.operators.OperatorCast; import rx.operators.OperatorFromIterable; @@ -6967,7 +6967,7 @@ public final Subscription subscribe(Subscriber observer, Scheduler sc * @see RxJava Wiki: subscribeOn() */ public final Observable subscribeOn(Scheduler scheduler) { - return create(OperationSubscribeOn.subscribeOn(this, scheduler)); + return from(this).lift(new OperatorSubscribeOn(scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java deleted file mode 100644 index 586d9c4287..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicReference; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Inner; -import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Action1; - -/** - * Asynchronously subscribes and unsubscribes Observers on the specified Scheduler. - *

- * - */ -public class OperationSubscribeOn { - - public static OnSubscribeFunc subscribeOn(Observable source, Scheduler scheduler) { - return new SubscribeOn(source, scheduler); - } - - private static class SubscribeOn implements OnSubscribeFunc { - private final Observable source; - private final Scheduler scheduler; - - public SubscribeOn(Observable source, Scheduler scheduler) { - this.source = source; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - final CompositeSubscription s = new CompositeSubscription(); - scheduler.schedule(new Action1() { - - @Override - public void call(final Inner inner) { - s.add(new ScheduledSubscription(source.subscribe(observer), inner)); - } - - }); - // only include the ScheduledSubscription - // but not the actual Subscription from the Scheduler as we need to schedule the unsubscribe action - // and therefore can't unsubscribe the scheduler until after the unsubscribe happens - return s; - } - } - - private static class ScheduledSubscription implements Subscription { - private final Subscription underlying; - private volatile boolean unsubscribed = false; - private final Scheduler.Inner scheduler; - - private ScheduledSubscription(Subscription underlying, Inner scheduler) { - this.underlying = underlying; - this.scheduler = scheduler; - } - - @Override - public void unsubscribe() { - unsubscribed = true; - scheduler.schedule(new Action1() { - @Override - public void call(Inner inner) { - underlying.unsubscribe(); - // tear down this subscription as well now that we're done -// inner.unsubscribe(); - } - }); - } - - @Override - public boolean isUnsubscribed() { - return unsubscribed; - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java new file mode 100644 index 0000000000..44a48af955 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -0,0 +1,105 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Scheduler; +import rx.Scheduler.Inner; +import rx.Subscriber; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Asynchronously subscribes and unsubscribes Observers on the specified Scheduler. + *

+ * + */ +public class OperatorSubscribeOn implements Operator> { + + private final Scheduler scheduler; + + public OperatorSubscribeOn(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public Subscriber> call( + final Subscriber subscriber) { + return new Subscriber>() { + + @Override + public void onCompleted() { + // ignore + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(final Observable o) { + scheduler.schedule(new Action1() { + + @Override + public void call(final Inner inner) { + if (!inner.isUnsubscribed()) { + final CompositeSubscription cs = new CompositeSubscription(); + subscriber.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + scheduler.schedule(new Action1() { + + @Override + public void call(final Inner inner) { + cs.unsubscribe(); + inner.unsubscribe(); + } + + }); + } + + })); + cs.add(subscriber); + o.subscribe(new Subscriber(cs) { + + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(T t) { + subscriber.onNext(t); + } + }); + } + } + + }); + } + + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java deleted file mode 100644 index 380c78eb02..0000000000 --- a/rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; -import static rx.operators.OperationSubscribeOn.*; - -import org.junit.Test; - -import rx.Observable; -import rx.Observer; -import rx.Scheduler; -import rx.Subscription; -import rx.schedulers.Schedulers; -import rx.test.OperatorTester; -import rx.util.functions.Action1; - -public class OperationSubscribeOnTest { - - @Test - @SuppressWarnings("unchecked") - public void testSubscribeOn() { - Observable w = Observable.from(1, 2, 3); - - Scheduler scheduler = spy(OperatorTester.forwardingScheduler(Schedulers.immediate())); - - Observer observer = mock(Observer.class); - Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); - - verify(scheduler, times(1)).schedule(any(Action1.class)); - subscription.unsubscribe(); - verify(scheduler, times(1)).schedule(any(Action1.class)); - verifyNoMoreInteractions(scheduler); - - verify(observer, times(1)).onNext(1); - verify(observer, times(1)).onNext(2); - verify(observer, times(1)).onNext(3); - verify(observer, times(1)).onCompleted(); - } -} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java new file mode 100644 index 0000000000..28395d5845 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java @@ -0,0 +1,173 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Scheduler; +import rx.Subscriber; +import rx.Subscription; +import rx.observers.TestObserver; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; +import rx.test.OperatorTester; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +public class OperatorSubscribeOnTest { + + @Test + @SuppressWarnings("unchecked") + public void testSubscribeOn() { + Observable w = Observable.from(Arrays.asList(1, 2, 3)); + + Scheduler scheduler = spy(OperatorTester.forwardingScheduler(Schedulers + .immediate())); + + TestObserver observer = new TestObserver(); + w.subscribeOn(scheduler).subscribe(observer); + + InOrder inOrder = inOrder(scheduler); + // The first one is for "subscribe", the second one is for + // "unsubscribe". + inOrder.verify(scheduler, times(2)).schedule(isA(Action1.class)); + inOrder.verifyNoMoreInteractions(); + + observer.assertReceivedOnNext(Arrays.asList(1, 2, 3)); + observer.assertTerminalEvent(); + } + + private class ThreadSubscription implements Subscription { + private volatile Thread thread; + + private final CountDownLatch latch = new CountDownLatch(1); + + private final Subscription s = Subscriptions.create(new Action0() { + + @Override + public void call() { + thread = Thread.currentThread(); + latch.countDown(); + } + + }); + + @Override + public void unsubscribe() { + s.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return s.isUnsubscribed(); + } + + public Thread getThread() throws InterruptedException { + latch.await(); + return thread; + } + } + + @Test + public void testSubscribeOnAndVerifySubscribeAndUnsubscribeThreads() + throws InterruptedException { + final ThreadSubscription subscription = new ThreadSubscription(); + final AtomicReference subscribeThread = new AtomicReference(); + Observable w = Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber t1) { + subscribeThread.set(Thread.currentThread()); + t1.add(subscription); + t1.onNext(1); + t1.onNext(2); + t1.onCompleted(); + } + }); + + TestObserver observer = new TestObserver(); + w.subscribeOn(Schedulers.computation()).subscribe(observer); + + Thread unsubscribeThread = subscription.getThread(); + + assertNotNull(unsubscribeThread); + assertNotSame(Thread.currentThread(), unsubscribeThread); + + assertNotNull(subscribeThread); + assertNotSame(Thread.currentThread(), subscribeThread); + + observer.assertReceivedOnNext(Arrays.asList(1, 2)); + observer.assertTerminalEvent(); + } + + @Test + public void testIssue813() throws InterruptedException { + // https://github.com/Netflix/RxJava/issues/813 + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(1); + + TestObserver observer = new TestObserver(); + final ThreadSubscription s = new ThreadSubscription(); + + final Subscription subscription = Observable + .create(new Observable.OnSubscribe() { + @Override + public void call( + final Subscriber subscriber) { + subscriber.add(s); + try { + latch.await(); + // Already called "unsubscribe", "isUnsubscribed" + // shouble be true + if (!subscriber.isUnsubscribed()) { + throw new IllegalStateException( + "subscriber.isUnsubscribed should be true"); + } + subscriber.onCompleted(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Throwable e) { + subscriber.onError(e); + } finally { + doneLatch.countDown(); + } + } + }).subscribeOn(Schedulers.computation()).subscribe(observer); + + subscription.unsubscribe(); + // As unsubscribe is called in other thread, we need to wait for it. + s.getThread(); + latch.countDown(); + doneLatch.await(); + assertEquals(0, observer.getOnErrorEvents().size()); + assertEquals(1, observer.getOnCompletedEvents().size()); + } +} From 8915b8bb5ae9219809368c3ef66406f075327fb6 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 6 Feb 2014 16:49:10 +0800 Subject: [PATCH 2/3] Remove 'inner.unsubscribe' --- rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java | 1 - 1 file changed, 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java index 44a48af955..674db1d554 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -69,7 +69,6 @@ public void call() { @Override public void call(final Inner inner) { cs.unsubscribe(); - inner.unsubscribe(); } }); From 90f814fa7292aca9044c72da5371c0805749dd11 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 6 Feb 2014 21:46:12 +0800 Subject: [PATCH 3/3] Using inner and fix unit tests --- .../rx/operators/OperatorSubscribeOn.java | 71 +++++++++---------- .../rx/operators/OperatorSubscribeOnTest.java | 38 ++-------- 2 files changed, 40 insertions(+), 69 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java index 674db1d554..94543d67cd 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -58,44 +58,41 @@ public void onNext(final Observable o) { @Override public void call(final Inner inner) { - if (!inner.isUnsubscribed()) { - final CompositeSubscription cs = new CompositeSubscription(); - subscriber.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - scheduler.schedule(new Action1() { - - @Override - public void call(final Inner inner) { - cs.unsubscribe(); - } - - }); - } - - })); - cs.add(subscriber); - o.subscribe(new Subscriber(cs) { - - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onNext(T t) { - subscriber.onNext(t); - } - }); - } + final CompositeSubscription cs = new CompositeSubscription(); + subscriber.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + inner.schedule(new Action1() { + + @Override + public void call(final Inner inner) { + cs.unsubscribe(); + } + + }); + } + + })); + cs.add(subscriber); + o.subscribe(new Subscriber(cs) { + + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(T t) { + subscriber.onNext(t); + } + }); } - }); } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java index 28395d5845..c6e01bc71d 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java @@ -18,53 +18,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; +import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; -import org.mockito.InOrder; import rx.Observable; import rx.Observable.OnSubscribe; -import rx.Scheduler; import rx.Subscriber; import rx.Subscription; import rx.observers.TestObserver; import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; -import rx.test.OperatorTester; import rx.util.functions.Action0; -import rx.util.functions.Action1; public class OperatorSubscribeOnTest { - @Test - @SuppressWarnings("unchecked") - public void testSubscribeOn() { - Observable w = Observable.from(Arrays.asList(1, 2, 3)); - - Scheduler scheduler = spy(OperatorTester.forwardingScheduler(Schedulers - .immediate())); - - TestObserver observer = new TestObserver(); - w.subscribeOn(scheduler).subscribe(observer); - - InOrder inOrder = inOrder(scheduler); - // The first one is for "subscribe", the second one is for - // "unsubscribe". - inOrder.verify(scheduler, times(2)).schedule(isA(Action1.class)); - inOrder.verifyNoMoreInteractions(); - - observer.assertReceivedOnNext(Arrays.asList(1, 2, 3)); - observer.assertTerminalEvent(); - } - private class ThreadSubscription implements Subscription { private volatile Thread thread; @@ -114,15 +86,17 @@ public void call(Subscriber t1) { }); TestObserver observer = new TestObserver(); - w.subscribeOn(Schedulers.computation()).subscribe(observer); + w.subscribeOn(Schedulers.newThread()).subscribe(observer); Thread unsubscribeThread = subscription.getThread(); assertNotNull(unsubscribeThread); assertNotSame(Thread.currentThread(), unsubscribeThread); - assertNotNull(subscribeThread); - assertNotSame(Thread.currentThread(), subscribeThread); + assertNotNull(subscribeThread.get()); + assertNotSame(Thread.currentThread(), subscribeThread.get()); + // True for Schedulers.newThread() + assertTrue(unsubscribeThread == subscribeThread.get()); observer.assertReceivedOnNext(Arrays.asList(1, 2)); observer.assertTerminalEvent();