From 30b9296d7df4d413f5fb433bfcf2c70c5c0c9768 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 17 Nov 2014 10:23:19 +0100 Subject: [PATCH 1/2] Fixed unsubscription issue in buffer with time and a potential concurrency issue in merge. --- .../operators/OperatorBufferWithTime.java | 12 ++++--- .../rx/internal/operators/OperatorMerge.java | 2 +- .../operators/OperatorBufferTest.java | 34 +++++++++++++++++++ 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithTime.java b/src/main/java/rx/internal/operators/OperatorBufferWithTime.java index 8111420e20..d0954f88b3 100644 --- a/src/main/java/rx/internal/operators/OperatorBufferWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorBufferWithTime.java @@ -72,15 +72,19 @@ public OperatorBufferWithTime(long timespan, long timeshift, TimeUnit unit, int @Override public Subscriber call(final Subscriber> child) { final Worker inner = scheduler.createWorker(); - child.add(inner); + SerializedSubscriber> serialized = new SerializedSubscriber>(child); if (timespan == timeshift) { - ExactSubscriber bsub = new ExactSubscriber(new SerializedSubscriber>(child), inner); + ExactSubscriber bsub = new ExactSubscriber(serialized, inner); + bsub.add(inner); + child.add(bsub); bsub.scheduleExact(); return bsub; } - InexactSubscriber bsub = new InexactSubscriber(new SerializedSubscriber>(child), inner); + InexactSubscriber bsub = new InexactSubscriber(serialized, inner); + bsub.add(inner); + child.add(bsub); bsub.startNewChunk(); bsub.scheduleChunk(); return bsub; @@ -94,7 +98,6 @@ final class InexactSubscriber extends Subscriber { /** Guarded by this. */ boolean done; public InexactSubscriber(Subscriber> child, Worker inner) { - super(child); this.child = child; this.inner = inner; this.chunks = new LinkedList>(); @@ -219,7 +222,6 @@ final class ExactSubscriber extends Subscriber { /** Guarded by this. */ boolean done; public ExactSubscriber(Subscriber> child, Worker inner) { - super(child); this.child = child; this.inner = inner; this.chunk = new ArrayList(); diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 4182ab9fca..bbc132cda6 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -501,7 +501,7 @@ public void request(long n) { REQUESTED.getAndAdd(this, n); if (ms.drainQueuesIfNeeded()) { boolean sendComplete = false; - synchronized (this) { + synchronized (ms) { if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) { sendComplete = true; } diff --git a/src/test/java/rx/internal/operators/OperatorBufferTest.java b/src/test/java/rx/internal/operators/OperatorBufferTest.java index 85af16ab1d..8e0a9b614e 100644 --- a/src/test/java/rx/internal/operators/OperatorBufferTest.java +++ b/src/test/java/rx/internal/operators/OperatorBufferTest.java @@ -981,4 +981,38 @@ public void onNext(List t) { }); assertEquals(Long.MAX_VALUE, requested.get()); } + @Test(timeout = 3000) + public void testBufferWithTimeDoesntUnsubscribeDownstream() throws InterruptedException { + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + + final CountDownLatch cdl = new CountDownLatch(1); + Subscriber s = new Subscriber() { + @Override + public void onNext(Object t) { + o.onNext(t); + } + @Override + public void onError(Throwable e) { + o.onError(e); + cdl.countDown(); + } + @Override + public void onCompleted() { + o.onCompleted(); + cdl.countDown(); + } + }; + + Observable.range(1, 1).delay(1, TimeUnit.SECONDS).buffer(2, TimeUnit.SECONDS).unsafeSubscribe(s); + + cdl.await(); + + verify(o).onNext(Arrays.asList(1)); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + assertFalse(s.isUnsubscribed()); + } } From b0aeb620ec6c7a396f6edd95a782648f6a005ab0 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 17 Nov 2014 10:26:44 +0100 Subject: [PATCH 2/2] Whitespace fix. --- src/main/java/rx/internal/operators/OperatorBufferWithTime.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithTime.java b/src/main/java/rx/internal/operators/OperatorBufferWithTime.java index d0954f88b3..3b2dd63704 100644 --- a/src/main/java/rx/internal/operators/OperatorBufferWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorBufferWithTime.java @@ -75,7 +75,7 @@ public Subscriber call(final Subscriber> child) { SerializedSubscriber> serialized = new SerializedSubscriber>(child); if (timespan == timeshift) { - ExactSubscriber bsub = new ExactSubscriber(serialized, inner); + ExactSubscriber bsub = new ExactSubscriber(serialized, inner); bsub.add(inner); child.add(bsub); bsub.scheduleExact();