Skip to content

Commit

Permalink
Issue ReactiveX#383: Added Retry support for reactor (ReactiveX#390)
Browse files Browse the repository at this point in the history
* retry reactor ReactiveX#383

* retry reactor ReactiveX#383

* review comments

* javadoc update
  • Loading branch information
Romeh authored and RobWin committed Apr 3, 2019
1 parent 34074be commit 071ce72
Show file tree
Hide file tree
Showing 5 changed files with 500 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2019 Mahmoud Romeh
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.reactor.retry;

/**
* retry runtime exception wrapper for the thrown checked exception
*/
public class RetryExceptionWrapper extends RuntimeException {
public RetryExceptionWrapper(Exception e) {
super(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2019 Mahmoud Romeh
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.reactor.retry;

import java.util.function.Consumer;
import java.util.function.Function;

import org.reactivestreams.Publisher;

import io.github.resilience4j.retry.Retry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* A Reactor Retry operator which wraps a reactive type in a Retry.
* @param <T> the value type of the upstream and downstream
*/
public class RetryOperator<T> implements Function<Publisher<T>, Publisher<T>> {

private final Retry retry;

private RetryOperator(Retry retry) {
this.retry = retry;
}

/**
* Creates a retry.
*
* @param <T> the value type of the upstream and downstream
* @param retry the retry
* @return a RetryOperator
*/
public static <T> RetryOperator<T> of(Retry retry) {
return new RetryOperator<>(retry);
}

@Override
public Publisher<T> apply(Publisher<T> publisher) {
if (publisher instanceof Mono) {
//noinspection unchecked
Context<T> context = new Context<T>(retry.context());
Mono<T> upstream = (Mono<T>) publisher;
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(throwingConsumerWrapper(context::onError)))
.doOnSuccess(t -> context.onComplete());
} else if (publisher instanceof Flux) {
//noinspection unchecked
Context<T> context = new Context<T>(retry.context());
Flux<T> upstream = (Flux<T>) publisher;
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(throwingConsumerWrapper(context::onError)))
.doOnComplete(context::onComplete);
}
throw new IllegalStateException("Publisher of type <" + publisher.getClass().getSimpleName()
+ "> are not supported by this operator");
}


private static class Context<T> {
private final Retry.Context<T> context;

Context(Retry.Context<T> context) {
this.context = context;
}

void onComplete() {
this.context.onSuccess();
}

void throwExceptionToForceRetryOnResult(T value) {
if (context.onResult(value))
throw new RetryDueToResultException();
}

void onError(Throwable throwable) throws Exception {
if (throwable instanceof RetryDueToResultException) return;
// Filter Error to not retry on it
if (throwable instanceof Error) {
throw (Error) throwable;
}
try {
if (throwable instanceof RetryExceptionWrapper) {
context.onError(castToException(throwable.getCause()));
} else {
context.onError(castToException(throwable));
}

} catch (Throwable t) {
throw castToException(t);
}
}

private Exception castToException(Throwable throwable) {
return throwable instanceof Exception ? (Exception) throwable : new Exception(throwable);
}

private static class RetryDueToResultException extends RuntimeException {
RetryDueToResultException() {
super("retry due to retryOnResult predicate");
}
}
}

/**
* @param <T> input
* @param <E> possible thrown exception
*/
@FunctionalInterface
public interface ThrowingConsumer<T, E extends Exception> {
void accept(T t) throws E;
}

/**
* to handle checked exception handling in reactor Function java 8 doOnNext
*/
private static <T> Consumer<T> throwingConsumerWrapper(
ThrowingConsumer<T, Exception> throwingConsumer) {

return i -> {
try {
throwingConsumer.accept(i);
} catch (Exception ex) {
throw new RetryExceptionWrapper(ex);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.github.resilience4j.reactor;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.junit.Test;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
Expand All @@ -10,16 +16,14 @@
import io.github.resilience4j.reactor.bulkhead.operator.BulkheadOperator;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.junit.Test;
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class CombinedOperatorsTest {

private final RateLimiter rateLimiter = RateLimiter.of("test",
Expand All @@ -35,6 +39,10 @@ public class CombinedOperatorsTest {
private Bulkhead bulkhead = Bulkhead
.of("test", BulkheadConfig.custom().maxConcurrentCalls(1).maxWaitTime(0).build());

private final RetryConfig config = io.github.resilience4j.retry.RetryConfig.ofDefaults();
private final Retry retry = Retry.of("testName", config);
private final RetryOperator<String> retryOperator = RetryOperator.of(retry);

@Test
public void shouldEmitEvents() {
StepVerifier.create(
Expand All @@ -47,6 +55,19 @@ public void shouldEmitEvents() {
.verifyComplete();
}

@Test
public void shouldEmitEventsWithRetry() {
StepVerifier.create(
Flux.just("Event 1", "Event 2")
.transform(retryOperator)
.transform(BulkheadOperator.of(bulkhead))
.transform(RateLimiterOperator.of(rateLimiter))
.transform(CircuitBreakerOperator.of(circuitBreaker))
).expectNext("Event 1")
.expectNext("Event 2")
.verifyComplete();
}

@Test
public void shouldEmitEvent() {
StepVerifier.create(
Expand All @@ -58,6 +79,18 @@ public void shouldEmitEvent() {
.verifyComplete();
}

@Test
public void shouldEmitEventWithRetry() {
StepVerifier.create(
Mono.just("Event 1")
.transform(retryOperator)
.transform(BulkheadOperator.of(bulkhead))
.transform(RateLimiterOperator.of(rateLimiter))
.transform(CircuitBreakerOperator.of(circuitBreaker))
).expectNext("Event 1")
.verifyComplete();
}

@Test
public void shouldPropagateError() {
StepVerifier.create(
Expand Down
Loading

0 comments on commit 071ce72

Please sign in to comment.