From 87b47730f7778d07f1f49c33b648aee0dd08a67b Mon Sep 17 00:00:00 2001 From: James Barr Date: Sun, 19 Feb 2017 17:34:16 -0800 Subject: [PATCH] 2.x: fix Observable.zip to dispose eagerly --- .../operators/observable/ObservableZip.java | 25 +++++++++---- .../observable/ObservableZipTest.java | 37 ++++++++++++++++++- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java index 7031a36e30..2227849571 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java @@ -115,6 +115,7 @@ public void subscribe(ObservableSource[] sources, int bufferSize) { public void dispose() { if (!cancelled) { cancelled = true; + cancelSources(); if (getAndIncrement() == 0) { clear(); } @@ -126,9 +127,19 @@ public boolean isDisposed() { return cancelled; } - void clear() { + void cancel() { + clear(); + cancelSources(); + } + + void cancelSources() { for (ZipObserver zs : observers) { zs.dispose(); + } + } + + void clear() { + for (ZipObserver zs : observers) { zs.queue.clear(); } } @@ -168,7 +179,7 @@ public void drain() { if (z.done && !delayError) { Throwable ex = z.error; if (ex != null) { - clear(); + cancel(); a.onError(ex); return; } @@ -186,7 +197,7 @@ public void drain() { v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - clear(); + cancel(); a.onError(ex); return; } @@ -205,7 +216,7 @@ public void drain() { boolean checkTerminated(boolean d, boolean empty, Observer a, boolean delayError, ZipObserver source) { if (cancelled) { - clear(); + cancel(); return true; } @@ -213,7 +224,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, boolean if (delayError) { if (empty) { Throwable e = source.error; - clear(); + cancel(); if (e != null) { a.onError(e); } else { @@ -224,12 +235,12 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, boolean } else { Throwable e = source.error; if (e != null) { - clear(); + cancel(); a.onError(e); return true; } else if (empty) { - clear(); + cancel(); a.onComplete(); return true; } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java index 73e5d936fe..475cceeca7 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java @@ -1392,4 +1392,39 @@ public List apply(Object t1, Object t2) throws Exception { assertTrue(list.toString(), list.contains("RxSi")); assertTrue(list.toString(), list.contains("RxCo")); } - }} + } + + @Test + public void eagerDispose() { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + TestObserver ts = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + if (ps1.hasObservers()) { + onError(new IllegalStateException("ps1 not disposed")); + } else + if (ps2.hasObservers()) { + onError(new IllegalStateException("ps2 not disposed")); + } else { + onComplete(); + } + } + }; + + Observable.zip(ps1, ps2, new BiFunction() { + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + } + }) + .subscribe(ts); + + ps1.onNext(1); + ps2.onNext(2); + ts.assertResult(3); + } +}