From e3ae4ff7b59f021e5e65235cdccb21ceb365a39d Mon Sep 17 00:00:00 2001 From: Klemen Kresnik Date: Tue, 9 Feb 2016 14:47:58 +0100 Subject: [PATCH] Added retry and retryWhen support for Single. --- src/main/java/rx/Single.java | 242 +++++++++++++++++++++---------- src/test/java/rx/SingleTest.java | 187 ++++++++++++++++++------ 2 files changed, 309 insertions(+), 120 deletions(-) diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index a768779a4df..4e444c43f5d 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1,50 +1,37 @@ /** * Copyright 2015 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package rx; -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import rx.Observable.Operator; +import rx.annotations.Beta; import rx.annotations.Experimental; import rx.exceptions.Exceptions; import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.functions.Func3; -import rx.functions.Func4; -import rx.functions.Func5; -import rx.functions.Func6; -import rx.functions.Func7; -import rx.functions.Func8; -import rx.functions.Func9; -import rx.functions.FuncN; -import rx.annotations.Beta; +import rx.functions.*; import rx.internal.operators.*; import rx.internal.producers.SingleDelayedProducer; import rx.internal.util.ScalarSynchronousSingle; import rx.internal.util.UtilityFunctions; -import rx.singles.BlockingSingle; import rx.observers.SafeSubscriber; -import rx.plugins.*; +import rx.plugins.RxJavaObservableExecutionHook; +import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; +import rx.singles.BlockingSingle; import rx.subscriptions.Subscriptions; +import java.util.Collection; +import java.util.concurrent.*; + /** * The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the * implementation of the Reactive Pattern for a stream or vector of values. @@ -61,7 +48,7 @@ *

* For more information see the ReactiveX * documentation. - * + * * @param * the type of the item emitted by the Single * @since (If this class graduates from "Experimental" replace this parenthetical with the release number) @@ -76,7 +63,7 @@ public class Single { *

* Note: Use {@link #create(OnSubscribe)} to create a Single, instead of this constructor, * unless you specifically have a need for inheritance. - * + * * @param f * {@code OnExecute} to be executed when {@code execute(SingleSubscriber)} or * {@code subscribe(Subscriber)} is called @@ -132,7 +119,7 @@ private Single(final Observable.OnSubscribe f) { *

Scheduler:
*
{@code create} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the type of the item that this Single emits * @param f @@ -142,7 +129,7 @@ private Single(final Observable.OnSubscribe f) { * @see ReactiveX operators documentation: Create */ public static Single create(OnSubscribe f) { - return new Single(f); // TODO need hook + return new Single(f); // TODO need hook } /** @@ -168,14 +155,14 @@ public interface OnSubscribe extends Action1> { *
Scheduler:
*
{@code lift} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param lift * the Operator that implements the Single-operating function to be applied to the source Single * @return a Single that is the result of applying the lifted Operator to the source Single * @see RxJava wiki: Implementing Your Own Operators */ private Single lift(final Operator lift) { - // This method is private because not sure if we want to expose the Observable.Operator in this public API rather than a Single.Operator + // This method is private because not sure if we want to expose the Observable.Operator in this public API rather than a Single.Operator return new Single(new Observable.OnSubscribe() { @Override @@ -214,7 +201,7 @@ public void call(Subscriber o) { *
Scheduler:
*
{@code compose} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param transformer * implements the function that transforms the source Single * @return the source Single, transformed by the transformer function @@ -227,7 +214,7 @@ public Single compose(Transformer transformer) { /** * Transformer function used by {@link #compose}. - * + * * @warn more complete description needed */ public interface Transformer extends Func1, Single> { @@ -246,7 +233,7 @@ private static Observable asObservable(Single t) { /** * INTERNAL: Used with lift and operators. - * + * * Converts the source {@code Single} into an {@code Single>} that emits an Observable * that emits the same emission as the source Single. *

@@ -255,7 +242,7 @@ private static Observable asObservable(Single t) { *

Scheduler:
*
{@code nest} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return a Single that emits an Observable that emits the same item as the source Single * @see ReactiveX operators documentation: To */ @@ -318,7 +305,7 @@ public static Observable concat(Single t1, SingleScheduler: *
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be concatenated * @param t2 @@ -426,7 +413,7 @@ public static Observable concat(Single t1, SingleScheduler: *
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be concatenated * @param t2 @@ -458,7 +445,7 @@ public static Observable concat(Single t1, SingleScheduler: *
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be concatenated * @param t2 @@ -493,7 +480,7 @@ public static Observable concat(Single t1, SingleScheduler: *
{@code error} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param exception * the particular Throwable to pass to {@link SingleSubscriber#onError onError} * @param @@ -527,7 +514,7 @@ public void call(SingleSubscriber te) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param future * the source {@link Future} * @param @@ -554,7 +541,7 @@ public static Single from(Future future) { *
Scheduler:
*
{@code from} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param future * the source {@link Future} * @param timeout @@ -583,7 +570,7 @@ public static Single from(Future future, long timeout, TimeU *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param future * the source {@link Future} * @param scheduler @@ -647,7 +634,7 @@ public void call(SingleSubscriber singleSubscriber) { *
Scheduler:
*
{@code just} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param value * the item to emit * @param @@ -712,7 +699,7 @@ public void onError(Throwable error) { *
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -735,7 +722,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -760,7 +747,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -787,7 +774,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -816,7 +803,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -847,7 +834,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -880,7 +867,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -915,7 +902,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code merge} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @param t2 @@ -950,7 +937,7 @@ public static Observable merge(Single t1, SingleScheduler: *
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -979,7 +966,7 @@ public R call(Object... args) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -1010,7 +997,7 @@ public R call(Object... args) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -1043,7 +1030,7 @@ public R call(Object... args) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -1078,7 +1065,7 @@ public R call(Object... args) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -1116,7 +1103,7 @@ public R call(Object... args) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -1156,7 +1143,7 @@ public R call(Object... args) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -1198,7 +1185,7 @@ public R call(Object... args) { *
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param s1 * the first source Single * @param s2 @@ -1266,7 +1253,7 @@ public static Single zip(Iterable> singles, FuncNScheduler: *
{@code concat} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be concatenated after the current * @return an Observable that emits the item emitted by the source Single, followed by the item emitted by @@ -1286,7 +1273,7 @@ public final Observable concatWith(Single t1) { *
Scheduler:
*
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param func * a function that, when applied to the item emitted by the source Single, returns a Single * @return the Single returned from {@code func} when applied to the item emitted by the source Single @@ -1308,7 +1295,7 @@ public final Single flatMap(final Func1Scheduler: *
{@code flatMapObservable} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param func * a function that, when applied to the item emitted by the source Single, returns an * Observable @@ -1349,7 +1336,7 @@ public final Single map(Func1 func) { *
Scheduler:
*
{@code mergeWith} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param t1 * a Single to be merged * @return an Observable that emits all of the items emitted by the source Singles @@ -1368,7 +1355,7 @@ public final Observable mergeWith(Single t1) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param scheduler * the {@link Scheduler} to notify subscribers on * @return the source Single modified so that its subscribers are notified on the specified @@ -1403,7 +1390,7 @@ public final Single observeOn(Scheduler scheduler) { *
Scheduler:
*
{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param resumeFunction * a function that returns an item that the new Single will emit if the source Single encounters * an error @@ -1451,7 +1438,7 @@ public final Single onErrorResumeNext(Single resumeSingleInCaseO *
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* - * + * * @return a {@link Subscription} reference can request the {@link Single} stop work. * @throws OnErrorNotImplementedException * if the Single tries to call {@link Subscriber#onError} @@ -1484,7 +1471,7 @@ public final void onNext(T args) { *
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param onSuccess * the {@code Action1} you have designed to accept the emission from the Single * @return a {@link Subscription} reference can request the {@link Single} stop work. @@ -1526,7 +1513,7 @@ public final void onNext(T args) { *
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param onSuccess * the {@code Action1} you have designed to accept the emission from the Single * @param onError @@ -1576,7 +1563,7 @@ public final void onNext(T args) { *
Scheduler:
*
{@code unsafeSubscribe} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param subscriber * the Subscriber that will handle the emission or notification from the Single */ @@ -1628,7 +1615,7 @@ public final void unsafeSubscribe(Subscriber subscriber) { *
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param subscriber * the {@link Subscriber} that will handle the emission or notification from the Single * @return a {@link Subscription} reference can request the {@link Single} stop work. @@ -1716,7 +1703,7 @@ public final Subscription subscribe(Subscriber subscriber) { *
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param te * the {@link SingleSubscriber} that will handle the emission or notification from the Single * @return a {@link Subscription} reference can request the {@link Single} stop work. @@ -1762,7 +1749,7 @@ public void onNext(T t) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param scheduler * the {@link Scheduler} to perform subscription actions on * @return the source Single modified so that its subscriptions happen on the specified {@link Scheduler} @@ -1811,12 +1798,12 @@ public void onError(Throwable error) { } }); } - + /** * Converts this Single into an {@link Observable}. *

* - * + * * @return an {@link Observable} that emits a single item T. */ public final Observable toObservable() { @@ -1833,7 +1820,7 @@ public final Observable toObservable() { *

Scheduler:
*
This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timeout * maximum duration before the Single times out * @param timeUnit @@ -1856,7 +1843,7 @@ public final Single timeout(long timeout, TimeUnit timeUnit) { *
Scheduler:
*
you specify which {@link Scheduler} this operator will use
* - * + * * @param timeout * maximum duration before the Single times out * @param timeUnit @@ -1881,7 +1868,7 @@ public final Single timeout(long timeout, TimeUnit timeUnit, Scheduler schedu *
Scheduler:
*
This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.
* - * + * * @param timeout * maximum time before a timeout occurs * @param timeUnit @@ -1905,7 +1892,7 @@ public final Single timeout(long timeout, TimeUnit timeUnit, SingleScheduler: *
you specify which {@link Scheduler} this operator will use
* - * + * * @param timeout * maximum duration before a timeout occurs * @param timeUnit @@ -1948,7 +1935,7 @@ public final BlockingSingle toBlocking() { *
Scheduler:
*
{@code zipWith} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param * the type of items emitted by the {@code other} Single * @param @@ -2002,7 +1989,7 @@ public void onNext(T t) { return lift(new OperatorDoOnEach(observer)); } - + /** * Modifies the source {@link Single} so that it invokes an action when it calls {@code onSuccess}. *

@@ -2209,4 +2196,101 @@ static Single[] iterableToArray(final Iterable + * + * If the source Single calls {@link Observer#onError}, this method will resubscribe to the source + * Single rather than propagating the {@code onError} call. + * + *

+ *
Scheduler:
+ *
{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.
+ *
+ * + * @return the source Single modified with retry logic + * @see ReactiveX operators documentation: Retry + */ + public final Single retry() { + return toObservable().retry().toSingle(); + } + + /** + * Returns an Single that mirrors the source Single, resubscribing to it if it calls {@code onError} + * up to a specified number of retries. + * + * + * + * If the source Single calls {@link Observer#onError}, this method will resubscribe to the source + * Single for a maximum of {@code count} resubscriptions rather than propagating the + * {@code onError} call. + * + *
+ *
Scheduler:
+ *
{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.
+ *
+ * + * @param count + * number of retry attempts before failing + * + * @return the source Single modified with retry logic + * @see ReactiveX operators documentation: Retry + */ + public final Single retry(final long count) { + return toObservable().retry(count).toSingle(); + } + + /** + * Returns an Single that mirrors the source Single, resubscribing to it if it calls {@code onError} + * and the predicate returns true for that specific exception and retry count. + * + * + *
+ *
Backpressure Support:
+ *
This operator honors backpressure. + *
Scheduler:
+ *
{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.
+ *
+ * + * @param predicate + * the predicate that determines if a resubscription may happen in case of a specific exception + * and retry count + * + * @return the source Single modified with retry logic + * @see #retry() + * @see ReactiveX operators documentation: Retry + */ + public final Single retry(Func2 predicate) { + return toObservable().retry(predicate).toSingle(); + } + + /** + * Returns a Single that emits the same values as the source Single with the exception of an + * {@code onError}. An {@code onError} notification from the source will result in the emission of a + * {@link Throwable} item to the Observable provided as an argument to the {@code notificationHandler} + * function. If that Observable calls {@code onComplete} or {@code onError} then {@code retry} will call + * {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will + * resubscribe to the source Single. + * + * + * + *
+ *
Scheduler:
+ *
{@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.
+ *
+ * + * @param notificationHandler + * receives an Observable of notifications with which a user can complete or error, aborting the + * retry + * + * @return the source Observable modified with retry logic + * @see ReactiveX operators documentation: Retry + */ + public final Single retryWhen(final Func1, ? extends Observable> notificationHandler) { + return toObservable().retryWhen(notificationHandler).toSingle(); + } + } diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 15de8916365..24862318824 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -12,56 +12,27 @@ */ package rx; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.junit.Test; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import rx.Single.OnSubscribe; import rx.exceptions.CompositeException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.functions.Func3; -import rx.functions.Func4; -import rx.functions.Func5; -import rx.functions.Func6; -import rx.functions.Func7; -import rx.functions.Func8; -import rx.functions.Func9; -import rx.functions.FuncN; -import rx.schedulers.TestScheduler; -import rx.singles.BlockingSingle; +import rx.functions.*; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; +import rx.singles.BlockingSingle; import rx.subscriptions.Subscriptions; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + public class SingleTest { @Test @@ -1248,4 +1219,138 @@ public void iterableToArrayShouldConvertSet() { assertSame(s1, singlesArray[0]); assertSame(s2, singlesArray[1]); } + + @Test(timeout = 2000) + public void testRetry() { + TestSubscriber testSubscriber = new TestSubscriber(); + final TestSubscriber retryCounter = new TestSubscriber(); + + final int retryCount = 100; + Callable callable = new Callable() { + + @Override + public String call() throws Exception { + int errors = retryCounter.getOnErrorEvents().size(); + if (errors < retryCount) { + Exception exception = new Exception(); + retryCounter.onError(exception); + throw exception; + } + return null; + } + + }; + + Single.fromCallable(callable) + .retry() + .subscribe(testSubscriber); + + testSubscriber.assertCompleted(); + int numberOfErrors = retryCounter.getOnErrorEvents().size(); + assertEquals(retryCount, numberOfErrors); + } + + @Test(timeout = 2000) + public void testRetryWithCount() { + TestSubscriber testSubscriber = new TestSubscriber(); + final TestSubscriber retryCounter = new TestSubscriber(); + + final int retryCount = 100; + Callable callable = new Callable() { + + @Override + public String call() throws Exception { + int errors = retryCounter.getOnErrorEvents().size(); + if (errors < retryCount) { + Exception exception = new Exception(); + retryCounter.onError(exception); + throw exception; + } + + return null; + } + }; + + Single.fromCallable(callable) + .retry(retryCount) + .subscribe(testSubscriber); + + testSubscriber.assertCompleted(); + int numberOfErrors = retryCounter.getOnErrorEvents().size(); + assertEquals(retryCount, numberOfErrors); + } + + @Test + public void testRetryWithPredicate() { + TestSubscriber testSubscriber = new TestSubscriber(); + final TestSubscriber retryCounter = new TestSubscriber(); + + final int retryCount = 100; + Callable callable = new Callable() { + + @Override + public String call() throws Exception { + int errors = retryCounter.getOnErrorEvents().size(); + if (errors < retryCount) { + IOException exception = new IOException(); + retryCounter.onError(exception); + throw exception; + } + return null; + } + }; + + Single.fromCallable(callable) + .retry(new Func2() { + @Override + public Boolean call(Integer integer, Throwable throwable) { + return throwable instanceof IOException; + } + }) + .subscribe(testSubscriber); + + testSubscriber.assertCompleted(); + int numberOfErrors = retryCounter.getOnErrorEvents().size(); + assertEquals(retryCount, numberOfErrors); + } + + @Test + public void testRetryWhen() { + TestSubscriber testSubscriber = new TestSubscriber(); + final TestSubscriber retryCounter = new TestSubscriber(); + + final int retryCount = 100; + + Callable callable = new Callable() { + + @Override + public String call() throws Exception { + int errors = retryCounter.getOnErrorEvents().size(); + if (errors < retryCount) { + IOException exception = new IOException(); + retryCounter.onError(exception); + throw exception; + } + return null; + } + }; + + Single.fromCallable(callable) + .retryWhen(new Func1, Observable>() { + @Override + public Observable call(Observable observable) { + + return observable.flatMap(new Func1>() { + @Override + public Observable call(Throwable throwable) { + return throwable instanceof IOException ? Observable.just(null) : Observable.error(throwable); + } + }); + } + }) + .subscribe(testSubscriber); + + int numberOfErrors = retryCounter.getOnErrorEvents().size(); + assertEquals(retryCount, numberOfErrors); + } }