diff --git a/rxjava-core/src/main/java/rx/Notification.java b/rxjava-core/src/main/java/rx/Notification.java index 866ed06450..996e217d50 100644 --- a/rxjava-core/src/main/java/rx/Notification.java +++ b/rxjava-core/src/main/java/rx/Notification.java @@ -116,6 +116,16 @@ public boolean isOnNext() { return getKind() == Kind.OnNext; } + public void accept(Observer observer) { + if (isOnNext()) { + observer.onNext(getValue()); + } else if (isOnCompleted()) { + observer.onCompleted(); + } else if (isOnError()) { + observer.onError(getThrowable()); + } + } + public static enum Kind { OnNext, OnError, OnCompleted } diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index e1272b8152..1826ff959c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -15,13 +15,20 @@ */ package rx.operators; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Notification; import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.concurrency.CurrentThreadScheduler; import rx.concurrency.ImmediateScheduler; import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; /** * Asynchronously notify Observers on the specified Scheduler. @@ -38,6 +45,9 @@ private static class ObserveOn implements OnSubscribeFunc { private final Observable source; private final Scheduler scheduler; + final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); + final AtomicInteger counter = new AtomicInteger(0); + public ObserveOn(Observable source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; @@ -48,11 +58,55 @@ public Subscription onSubscribe(final Observer observer) { if (scheduler instanceof ImmediateScheduler) { // do nothing if we request ImmediateScheduler so we don't invoke overhead return source.subscribe(observer); + } else if (scheduler instanceof CurrentThreadScheduler) { + // do nothing if we request CurrentThreadScheduler so we don't invoke overhead + return source.subscribe(observer); } else { - CompositeSubscription s = new CompositeSubscription(); - s.add(source.subscribe(new ScheduledObserver(s, observer, scheduler))); - return s; + return observeOn(observer, scheduler); } } + + public Subscription observeOn(final Observer observer, Scheduler scheduler) { + final CompositeSubscription s = new CompositeSubscription(); + + s.add(source.materialize().subscribe(new Action1>() { + + @Override + public void call(Notification e) { + // this must happen before 'counter' is used to provide synchronization between threads + queue.offer(e); + + // we now use counter to atomically determine if we need to start processing or not + // it will be 0 if it's the first notification or the scheduler has finished processing work + // and we need to start doing it again + if (counter.getAndIncrement() == 0) { + processQueue(s, observer); + } + + } + })); + + return s; + } + + private void processQueue(CompositeSubscription s, final Observer observer) { + s.add(scheduler.schedule(new Action1() { + @Override + public void call(Action0 self) { + Notification not = queue.poll(); + if (not != null) { + not.accept(observer); + } + + // decrement count and if we still have work to do + // recursively schedule ourselves to process again + if (counter.decrementAndGet() > 0) { + self.call(); + } + + } + })); + } } + } diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java deleted file mode 100644 index 0bfef9c2c9..0000000000 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import rx.Notification; -import rx.Observer; -import rx.Scheduler; -import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.MultipleAssignmentSubscription; -import rx.util.functions.Func2; - -/* package */class ScheduledObserver implements Observer { - private final Observer underlying; - private final Scheduler scheduler; - private final CompositeSubscription parentSubscription; - private final EventLoop eventLoop = new EventLoop(); - final AtomicInteger counter = new AtomicInteger(); - private final AtomicBoolean started = new AtomicBoolean(); - - private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - - public ScheduledObserver(CompositeSubscription s, Observer underlying, Scheduler scheduler) { - this.parentSubscription = s; - this.underlying = underlying; - this.scheduler = scheduler; - } - - @Override - public void onCompleted() { - enqueue(new Notification()); - } - - @Override - public void onError(final Throwable e) { - enqueue(new Notification(e)); - } - - @Override - public void onNext(final T args) { - enqueue(new Notification(args)); - } - - private void enqueue(Notification notification) { - // this must happen before synchronization between threads - queue.offer(notification); - - /** - * If the counter is currently at 0 (before incrementing with this addition) - * we will schedule the work. - */ - if (counter.getAndIncrement() <= 0) { - if (!started.get() && started.compareAndSet(false, true)) { - // first time we use the parent scheduler to start the event loop - MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription(); - parentSubscription.add(scheduler.schedule(recursiveSubscription, eventLoop)); - parentSubscription.add(recursiveSubscription); - } else { - // subsequent times we reschedule existing one - eventLoop.reschedule(); - } - } - } - - private class EventLoop implements Func2 { - - volatile Scheduler _recursiveScheduler; - volatile MultipleAssignmentSubscription _recursiveSubscription; - - public void reschedule() { - _recursiveSubscription.setSubscription(_recursiveScheduler.schedule(_recursiveSubscription, this)); - } - - @Override - public Subscription call(Scheduler s, MultipleAssignmentSubscription recursiveSubscription) { - /* - * -------------------------------------------------------------------------------------- - * Set these the first time through so we can externally trigger recursive execution again - */ - if (_recursiveScheduler == null) { - _recursiveScheduler = s; - } - if (_recursiveSubscription == null) { - _recursiveSubscription = recursiveSubscription; - } - /* - * Back to regular flow - * -------------------------------------------------------------------------------------- - */ - - do { - Notification notification = queue.poll(); - // if we got a notification, send it - if (notification != null) { - - // if unsubscribed stop working - if (parentSubscription.isUnsubscribed()) { - return parentSubscription; - } - // process notification - - switch (notification.getKind()) { - case OnNext: - underlying.onNext(notification.getValue()); - break; - case OnError: - underlying.onError(notification.getThrowable()); - break; - case OnCompleted: - underlying.onCompleted(); - break; - default: - throw new IllegalStateException("Unknown kind of notification " + notification); - } - } - } while (counter.decrementAndGet() > 0); - - return parentSubscription; - } - } -} diff --git a/rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java b/rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java index e99a25eaf3..a89da84304 100644 --- a/rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java +++ b/rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java @@ -28,12 +28,17 @@ public void testUnsubscribeOfNewThread() throws InterruptedException { public void testUnsubscribeOfThreadPoolForIO() throws InterruptedException { testUnSubscribeForScheduler(Schedulers.threadPoolForIO()); } - + @Test public void testUnsubscribeOfThreadPoolForComputation() throws InterruptedException { testUnSubscribeForScheduler(Schedulers.threadPoolForComputation()); } - + + @Test + public void testUnsubscribeOfImmediateThread() throws InterruptedException { + testUnSubscribeForScheduler(Schedulers.immediate()); + } + @Test public void testUnsubscribeOfCurrentThread() throws InterruptedException { testUnSubscribeForScheduler(Schedulers.currentThread()); @@ -56,7 +61,7 @@ public Long call(Long aLong) { } }) .subscribeOn(scheduler) - .observeOn(Schedulers.currentThread()) + .observeOn(scheduler) .subscribe(new Observer() { @Override public void onCompleted() { diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java index e114f637ff..15f1a44fb3 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,6 +16,7 @@ package rx.operators; import static org.junit.Assert.*; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import static rx.operators.OperationObserveOn.*; @@ -30,6 +31,7 @@ import rx.Observable; import rx.Observer; import rx.concurrency.Schedulers; +import rx.util.functions.Action1; public class OperationObserveOnTest { @@ -81,4 +83,53 @@ public Void answer(InvocationOnMock invocation) throws Throwable { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + + @Test + @SuppressWarnings("unchecked") + public void testThreadName() throws InterruptedException { + Observable obs = Observable.from("one", null, "two", "three", "four"); + + Observer observer = mock(Observer.class); + + InOrder inOrder = inOrder(observer); + + final CountDownLatch completedLatch = new CountDownLatch(1); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + completedLatch.countDown(); + + return null; + } + }).when(observer).onCompleted(); + + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + completedLatch.countDown(); + + return null; + } + }).when(observer).onError(any(Exception.class)); + + obs.observeOn(Schedulers.newThread()).doOnEach(new Action1() { + + @Override + public void call(String t1) { + String threadName = Thread.currentThread().getName(); + boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler"); + System.out.println("ThreadName: " + threadName + " Correct => " + correctThreadName); + assertTrue(correctThreadName); + } + + }).subscribe(observer); + + if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) { + fail("timed out waiting"); + } + + inOrder.verify(observer, times(1)).onCompleted(); + } }