diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java index 9719c4216d1..e65b9566463 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java @@ -40,7 +40,6 @@ public static Func1>, Subscription> toObservableList(Observ private static class ToObservableList implements Func1>, Subscription> { private final Observable that; - final ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); public ToObservableList(Observable that) { this.that = that; @@ -49,6 +48,7 @@ public ToObservableList(Observable that) { public Subscription call(final Observer> observer) { return that.subscribe(new Observer() { + final ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); public void onNext(T value) { // onNext can be concurrently executed so list must be thread-safe list.add(value); @@ -94,5 +94,28 @@ public void testList() { verify(aObserver, Mockito.never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); } + + @Test + public void testListMultipleObservers() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable> observable = Observable.create(toObservableList(w)); + + @SuppressWarnings("unchecked") + Observer> o1 = mock(Observer.class); + observable.subscribe(o1); + + Observer> o2 = mock(Observer.class); + observable.subscribe(o2); + + List expected = Arrays.asList("one", "two", "three"); + + verify(o1, times(1)).onNext(expected); + verify(o1, Mockito.never()).onError(any(Exception.class)); + verify(o1, times(1)).onCompleted(); + + verify(o2, times(1)).onNext(expected); + verify(o2, Mockito.never()).onError(any(Exception.class)); + verify(o2, times(1)).onCompleted(); + } } } \ No newline at end of file