Skip to content

Commit

Permalink
2.x: more eager cancellation in flatMapX (#5422)
Browse files Browse the repository at this point in the history
* 2.x: more eager cancellation in flatMapX

* Add more eager check to Observable.flatMapX
  • Loading branch information
akarnokd authored Jun 17, 2017
1 parent b7086ef commit 73a85c1
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static final class FlatMapCompletableMainSubscriber<T> extends BasicIntQueueSubs

Subscription s;

volatile boolean cancelled;

FlatMapCompletableMainSubscriber(Subscriber<? super T> observer,
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
int maxConcurrency) {
Expand Down Expand Up @@ -117,7 +119,7 @@ public void onNext(T value) {

InnerConsumer inner = new InnerConsumer();

if (set.add(inner)) {
if (!cancelled && set.add(inner)) {
cs.subscribe(inner);
}
}
Expand Down Expand Up @@ -164,6 +166,7 @@ public void onComplete() {

@Override
public void cancel() {
cancelled = true;
s.cancel();
set.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ static final class FlatMapCompletableMainSubscriber<T> extends AtomicInteger

Subscription s;

volatile boolean disposed;

FlatMapCompletableMainSubscriber(CompletableObserver observer,
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
int maxConcurrency) {
Expand Down Expand Up @@ -124,7 +126,7 @@ public void onNext(T value) {

InnerObserver inner = new InnerObserver();

if (set.add(inner)) {
if (!disposed && set.add(inner)) {
cs.subscribe(inner);
}
}
Expand Down Expand Up @@ -171,6 +173,7 @@ public void onComplete() {

@Override
public void dispose() {
disposed = true;
s.cancel();
set.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void onNext(T t) {

InnerObserver inner = new InnerObserver();

if (set.add(inner)) {
if (!cancelled && set.add(inner)) {
ms.subscribe(inner);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void onNext(T t) {

InnerObserver inner = new InnerObserver();

if (set.add(inner)) {
if (!cancelled && set.add(inner)) {
ms.subscribe(inner);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ static final class FlatMapCompletableMainObserver<T> extends BasicIntQueueDispos

Disposable d;

volatile boolean disposed;

FlatMapCompletableMainObserver(Observer<? super T> observer, Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
this.actual = observer;
this.mapper = mapper;
Expand Down Expand Up @@ -99,7 +101,7 @@ public void onNext(T value) {

InnerObserver inner = new InnerObserver();

if (set.add(inner)) {
if (!disposed && set.add(inner)) {
cs.subscribe(inner);
}
}
Expand Down Expand Up @@ -138,6 +140,7 @@ public void onComplete() {

@Override
public void dispose() {
disposed = true;
d.dispose();
set.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ static final class FlatMapCompletableMainObserver<T> extends AtomicInteger imple

Disposable d;

volatile boolean disposed;

FlatMapCompletableMainObserver(CompletableObserver observer, Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
this.actual = observer;
this.mapper = mapper;
Expand Down Expand Up @@ -104,7 +106,7 @@ public void onNext(T value) {

InnerObserver inner = new InnerObserver();

if (set.add(inner)) {
if (!disposed && set.add(inner)) {
cs.subscribe(inner);
}
}
Expand Down Expand Up @@ -143,6 +145,7 @@ public void onComplete() {

@Override
public void dispose() {
disposed = true;
d.dispose();
set.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void onNext(T t) {

InnerObserver inner = new InnerObserver();

if (set.add(inner)) {
if (!cancelled && set.add(inner)) {
ms.subscribe(inner);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void onNext(T t) {

InnerObserver inner = new InnerObserver();

if (set.add(inner)) {
if (!cancelled && set.add(inner)) {
ms.subscribe(inner);
}
}
Expand Down

0 comments on commit 73a85c1

Please sign in to comment.