From 5f77a31a1de84dda14782afd46be95767a2dce91 Mon Sep 17 00:00:00 2001 From: thegeez Date: Sun, 24 Mar 2013 20:14:58 +0100 Subject: [PATCH 1/2] Failing test case to show Observable.toList breaks with multiple observers due to sharing of the buffer list --- .../operators/OperationToObservableList.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java index 9719c4216d..24a2548187 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java @@ -94,5 +94,28 @@ public void testList() { verify(aObserver, Mockito.never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); } + + @Test + public void testListMultipleObservers() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable> observable = Observable.create(toObservableList(w)); + + @SuppressWarnings("unchecked") + Observer> o1 = mock(Observer.class); + observable.subscribe(o1); + + Observer> o2 = mock(Observer.class); + observable.subscribe(o2); + + List expected = Arrays.asList("one", "two", "three"); + + verify(o1, times(1)).onNext(expected); + verify(o1, Mockito.never()).onError(any(Exception.class)); + verify(o1, times(1)).onCompleted(); + + verify(o2, times(1)).onNext(expected); + verify(o2, Mockito.never()).onError(any(Exception.class)); + verify(o2, times(1)).onCompleted(); + } } } \ No newline at end of file From c21c1e7c4b1f5cddd6e985db9f17ae47035d4dbd Mon Sep 17 00:00:00 2001 From: thegeez Date: Sun, 24 Mar 2013 20:16:25 +0100 Subject: [PATCH 2/2] Fix for Observable.toList failing with multiple subscribers --- .../src/main/java/rx/operators/OperationToObservableList.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java index 24a2548187..e65b956646 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java @@ -40,7 +40,6 @@ public static Func1>, Subscription> toObservableList(Observ private static class ToObservableList implements Func1>, Subscription> { private final Observable that; - final ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); public ToObservableList(Observable that) { this.that = that; @@ -49,6 +48,7 @@ public ToObservableList(Observable that) { public Subscription call(final Observer> observer) { return that.subscribe(new Observer() { + final ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); public void onNext(T value) { // onNext can be concurrently executed so list must be thread-safe list.add(value);