Skip to content

Commit

Permalink
2.x: Flowable.take to route post-cancel errors to plugin error handler (
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Apr 29, 2018
1 parent dc94f56 commit a1b9628
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.reactivex.*;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableTake<T> extends AbstractFlowableWithUpstream<T, T> {
final long limit;
Expand Down Expand Up @@ -75,6 +76,8 @@ public void onError(Throwable t) {
done = true;
subscription.cancel();
actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,19 @@ public void run() {
ts.assertResult(1, 2, 3, 4, 5);
}
}

@Test
public void errorAfterLimitReached() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.error(new TestException())
.limit(0)
.test()
.assertResult();

TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

Expand All @@ -28,6 +29,7 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
Expand Down Expand Up @@ -495,4 +497,19 @@ public void run() {
ts.assertResult(1, 2);
}
}

@Test
public void errorAfterLimitReached() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.error(new TestException())
.take(0)
.test()
.assertResult();

TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@
package io.reactivex.internal.operators.observable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.*;

import org.junit.*;
import org.mockito.InOrder;

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

Expand Down Expand Up @@ -389,4 +393,19 @@ public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
}
});
}

@Test
public void errorAfterLimitReached() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.error(new TestException())
.take(0)
.test()
.assertResult();

TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}

0 comments on commit a1b9628

Please sign in to comment.