From c2baf9389f66e8f333243fc1b9c4cb170da3a4d4 Mon Sep 17 00:00:00 2001 From: Vadim Spivak Date: Wed, 22 Jan 2014 10:34:58 -0800 Subject: [PATCH 1/2] Fix Zip race condition ItemObserver onNext might not acquire the write lock due to an onCompleted being handled by another thread. When handling onCompleted, the ItemObserver does not check for any values that are ready to be emitted, which might cause OperationZip to never emit OnNext or OnCompleted. --- .../main/java/rx/operators/OperationZip.java | 99 ++++++++----------- 1 file changed, 42 insertions(+), 57 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 7aac3ce962..9dd5637a6c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -103,8 +103,7 @@ public static OnSubscribeFunc zip(Obs } public static OnSubscribeFunc zip(Iterable> ws, final FuncN zipFunction) { - ManyObservables a = new ManyObservables(ws, zipFunction); - return a; + return new ManyObservables(ws, zipFunction); } /** @@ -246,7 +245,6 @@ public ItemObserver( this.cancel = cancel; } - @SuppressWarnings("unchecked") @Override public void onNext(T value) { rwLock.readLock().lock(); @@ -258,43 +256,7 @@ public void onNext(T value) { } finally { rwLock.readLock().unlock(); } - // run collector - if (rwLock.writeLock().tryLock()) { - boolean cu = false; - try { - while (true) { - List values = new ArrayList(all.size()); - for (ItemObserver io : all) { - if (io.queue.isEmpty()) { - if (io.done) { - observer.onCompleted(); - cu = true; - return; - } - continue; - } - Object v = io.queue.peek(); - if (v == NULL_SENTINEL) { - v = null; - } - values.add((T) v); - } - if (values.size() == all.size()) { - for (ItemObserver io : all) { - io.queue.poll(); - } - observer.onNext(values); - } else { - break; - } - } - } finally { - rwLock.writeLock().unlock(); - if (cu) { - cancel.unsubscribe(); - } - } - } + runCollector(); } @Override @@ -321,23 +283,7 @@ public void onCompleted() { } finally { rwLock.readLock().unlock(); } - if (rwLock.writeLock().tryLock()) { - boolean cu = false; - try { - for (ItemObserver io : all) { - if (io.queue.isEmpty() && io.done) { - observer.onCompleted(); - cu = true; - return; - } - } - } finally { - rwLock.writeLock().unlock(); - if (cu) { - cancel.unsubscribe(); - } - } - } + runCollector(); unsubscribe(); } @@ -351,6 +297,45 @@ public void unsubscribe() { toSource.unsubscribe(); } + @SuppressWarnings("unchecked") + private void runCollector() { + if (rwLock.writeLock().tryLock()) { + boolean cu = false; + try { + while (true) { + List values = new ArrayList(all.size()); + for (ItemObserver io : all) { + if (io.queue.isEmpty()) { + if (io.done) { + observer.onCompleted(); + cu = true; + return; + } + continue; + } + Object v = io.queue.peek(); + if (v == NULL_SENTINEL) { + v = null; + } + values.add((T) v); + } + if (values.size() == all.size()) { + for (ItemObserver io : all) { + io.queue.poll(); + } + observer.onNext(values); + } else { + break; + } + } + } finally { + rwLock.writeLock().unlock(); + if (cu) { + cancel.unsubscribe(); + } + } + } + } } } From f3dbf3c03919e39c2ce6a2e3d94216534a7bb5c8 Mon Sep 17 00:00:00 2001 From: Vadim Spivak Date: Wed, 22 Jan 2014 13:39:50 -0800 Subject: [PATCH 2/2] Simplify OperationZip ItemObserver LinkedList permits null values, no need for NULL_SENTINEL. --- .../src/main/java/rx/operators/OperationZip.java | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 9dd5637a6c..5b05946913 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -202,11 +202,9 @@ private static final class ItemObserver implements Observer, Subscription /** Reader-writer lock. */ protected final ReadWriteLock rwLock; /** The queue. */ - public final Queue queue = new LinkedList(); + public final Queue queue = new LinkedList(); /** The list of the other observers. */ public final List> all; - /** The null sentinel value. */ - protected static final Object NULL_SENTINEL = new Object(); /** The global cancel. */ protected final Subscription cancel; /** The subscription to the source. */ @@ -252,7 +250,7 @@ public void onNext(T value) { if (done) { return; } - queue.add(value != null ? value : NULL_SENTINEL); + queue.add(value); } finally { rwLock.readLock().unlock(); } @@ -297,7 +295,6 @@ public void unsubscribe() { toSource.unsubscribe(); } - @SuppressWarnings("unchecked") private void runCollector() { if (rwLock.writeLock().tryLock()) { boolean cu = false; @@ -311,13 +308,10 @@ private void runCollector() { cu = true; return; } - continue; + } else { + T value = io.queue.peek(); + values.add(value); } - Object v = io.queue.peek(); - if (v == NULL_SENTINEL) { - v = null; - } - values.add((T) v); } if (values.size() == all.size()) { for (ItemObserver io : all) {