From 6c958e35da9ade5a44b4f5ea204e900230d07401 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 12 Dec 2014 11:18:01 +0800 Subject: [PATCH] Fix the issue that Sample doesn't call 'unsubscribe' --- .../operators/OperatorSampleWithTime.java | 1 + .../operators/OperatorSampleTest.java | 21 +++++++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorSampleWithTime.java b/src/main/java/rx/internal/operators/OperatorSampleWithTime.java index ea94a7db21..a476dfd96a 100644 --- a/src/main/java/rx/internal/operators/OperatorSampleWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorSampleWithTime.java @@ -68,6 +68,7 @@ static final class SamplerSubscriber extends Subscriber implements Action0 static final AtomicReferenceFieldUpdater VALUE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(SamplerSubscriber.class, Object.class, "value"); public SamplerSubscriber(Subscriber subscriber) { + super(subscriber); this.subscriber = subscriber; } diff --git a/src/test/java/rx/internal/operators/OperatorSampleTest.java b/src/test/java/rx/internal/operators/OperatorSampleTest.java index 0a8c9da58d..815d002061 100644 --- a/src/test/java/rx/internal/operators/OperatorSampleTest.java +++ b/src/test/java/rx/internal/operators/OperatorSampleTest.java @@ -28,14 +28,12 @@ import org.junit.Test; import org.mockito.InOrder; -import rx.Observable; +import rx.*; import rx.Observable.OnSubscribe; -import rx.Observer; -import rx.Scheduler; -import rx.Subscriber; import rx.functions.Action0; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; +import rx.subscriptions.Subscriptions; public class OperatorSampleTest { private TestScheduler scheduler; @@ -271,4 +269,19 @@ public void sampleWithSamplerThrows() { inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class)); verify(observer, never()).onCompleted(); } + + @Test + public void testSampleUnsubscribe() { + final Subscription s = mock(Subscription.class); + Observable o = Observable.create( + new OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + subscriber.add(s); + } + } + ); + o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().unsubscribe(); + verify(s).unsubscribe(); + } }