From 9c68e5ba5ae7048d9c9097e569a218a6f82a9a9b Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 28 Apr 2013 22:05:52 +0200 Subject: [PATCH] Initial implementation of AsyncSubject --- .../main/java/rx/subjects/AsyncSubject.java | 253 ++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/subjects/AsyncSubject.java diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java new file mode 100644 index 0000000000..f496b76e4b --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -0,0 +1,253 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subjects; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.anyString; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.mockito.Mockito; + +import rx.Observer; +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.SynchronizedObserver; +import rx.util.functions.Action1; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Subject that publishes only the last event to each {@link Observer} that has subscribed when the completes. + *

+ * Example usage: + *

+ *

 {@code
+ 
+  // observer will receive no onNext events.
+  AsyncSubject subject = AsyncSubject.create();
+  subject.subscribe(observer);
+  subject.onNext("one");
+  subject.onNext("two");
+  subject.onNext("three");
+ 
+  // observer will receive "three" as the only onNext event.
+  AsyncSubject subject = AsyncSubject.create();
+  subject.subscribe(observer);
+  subject.onNext("one");
+  subject.onNext("two");
+  subject.onNext("three");
+  subject.onCompleted();
+ 
+  } 
+ * 
+ * @param 
+ */
+public class AsyncSubject extends Subject {
+	
+    public static  AsyncSubject create() {
+        final ConcurrentHashMap> observers = new ConcurrentHashMap>();
+
+        Func1, Subscription> onSubscribe = new Func1, Subscription>() {
+            @Override
+            public Subscription call(Observer observer) {
+                final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
+
+                subscription.wrap(new Subscription() {
+                    @Override
+                    public void unsubscribe() {
+                        // on unsubscribe remove it from the map of outbound observers to notify
+                        observers.remove(subscription);
+                    }
+                });
+
+                // on subscribe add it to the map of outbound observers to notify
+                observers.put(subscription, new SynchronizedObserver(observer, subscription));
+                return subscription;
+            }
+        };
+
+        return new AsyncSubject(onSubscribe, observers);
+    }
+
+    private final ConcurrentHashMap> observers;
+    private final AtomicReference currentValue;
+
+    protected AsyncSubject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) {
+        super(onSubscribe);
+        this.observers = observers;
+        this.currentValue = new AtomicReference();
+    }
+
+    @Override
+    public void onCompleted() {
+    	T finalValue = currentValue.get();
+    	for (Observer observer : observers.values()) {
+			observer.onNext(finalValue);
+    	}
+        for (Observer observer : observers.values()) {
+            observer.onCompleted();
+        }
+    }
+
+    @Override
+    public void onError(Exception e) {
+        for (Observer observer : observers.values()) {
+            observer.onError(e);
+        }
+    }
+
+    @Override
+    public void onNext(T args) {
+        currentValue.set(args);
+    }
+
+    public static class UnitTest {
+
+        private final Exception testException = new Exception();
+
+        @Test
+        public void testNeverCompleted() {
+        	AsyncSubject subject = AsyncSubject.create();
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+            subject.onNext("three");
+
+            assertNeverCompletedObserver(aObserver);
+        }
+
+        private void assertNeverCompletedObserver(Observer aObserver)
+        {
+            verify(aObserver, Mockito.never()).onNext(anyString());
+            verify(aObserver, Mockito.never()).onError(testException);
+            verify(aObserver, Mockito.never()).onCompleted();
+        }
+        
+        @Test
+        public void testCompleted() {
+        	AsyncSubject subject = AsyncSubject.create();
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+            subject.onNext("three");
+            subject.onCompleted();
+
+            assertCompletedObserver(aObserver);
+        }
+
+        private void assertCompletedObserver(Observer aObserver)
+        {
+            verify(aObserver, times(1)).onNext("three");
+            verify(aObserver, Mockito.never()).onError(any(Exception.class));
+            verify(aObserver, times(1)).onCompleted();
+        }
+
+        @Test
+        public void testError() {
+        	AsyncSubject subject = AsyncSubject.create();
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+            subject.onNext("three");
+            subject.onError(testException);
+            subject.onNext("four");
+            subject.onError(new Exception());
+            subject.onCompleted();
+
+            assertErrorObserver(aObserver);
+        }
+
+        private void assertErrorObserver(Observer aObserver)
+        {
+            verify(aObserver, Mockito.never()).onNext(anyString());
+            verify(aObserver, times(1)).onError(testException);
+            verify(aObserver, Mockito.never()).onCompleted();
+        }
+
+        @Test
+        public void testUnsubscribeBeforeCompleted() {
+        	AsyncSubject subject = AsyncSubject.create();
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            Subscription subscription = subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+
+            subscription.unsubscribe();
+            assertNoOnNextEventsReceived(aObserver);
+
+            subject.onNext("three");
+            subject.onCompleted();
+
+            assertNoOnNextEventsReceived(aObserver);
+        }
+
+        private void assertNoOnNextEventsReceived(Observer aObserver)
+        {
+            verify(aObserver, Mockito.never()).onNext(anyString());
+            verify(aObserver, Mockito.never()).onError(any(Exception.class));
+            verify(aObserver, Mockito.never()).onCompleted();
+        }
+
+        @Test
+        public void testUnsubscribe()
+        {
+            UnsubscribeTester.test(new Func0>()
+            {
+                @Override
+                public AsyncSubject call()
+                {
+                    return AsyncSubject.create();
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(AsyncSubject DefaultSubject)
+                {
+                    DefaultSubject.onCompleted();
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(AsyncSubject DefaultSubject)
+                {
+                    DefaultSubject.onError(new Exception());
+                }
+            }, 
+            null);
+        }
+    }
+}