Skip to content

Commit

Permalink
Don't prevent using other call adapters in CircuitBreakerCallAdapter (f…
Browse files Browse the repository at this point in the history
  • Loading branch information
apottere authored and RobWin committed May 31, 2019
1 parent ec72266 commit 34d1204
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 154 deletions.
5 changes: 3 additions & 2 deletions libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ ext {
retrofit: "com.squareup.retrofit2:retrofit:${retrofitVersion}",
retrofit_test: "com.squareup.retrofit2:converter-scalars:${retrofitVersion}",
retrofit_wiremock: "com.github.tomakehurst:wiremock:${wiremockVersion}",

retrofit_rxjava: "com.squareup.retrofit2:adapter-rxjava2:2.3.0",

// Feign addon
feign: "io.github.openfeign:feign-core:${feignVersion}",
feign_wiremock: "com.github.tomakehurst:wiremock:${wiremockVersion}",

// Metrics addon
metrics: "io.dropwizard.metrics:metrics-core:${metricsVersion}",

Expand Down
3 changes: 2 additions & 1 deletion resilience4j-retrofit/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ dependencies {
testCompile ( libraries.retrofit_test )
testCompile ( libraries.retrofit_wiremock )
testCompile ( libraries.retrofit )
testCompile ( libraries.retrofit_rxjava )
}
ext.moduleName='io.github.resilience4j.retrofit'
ext.moduleName='io.github.resilience4j.retrofit'
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import retrofit2.Retrofit;

import java.lang.annotation.Annotation;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.function.Predicate;

Expand Down Expand Up @@ -63,30 +62,20 @@ private CircuitBreakerCallAdapter(final CircuitBreaker circuitBreaker, final Pre
}

@Override
public CallAdapter<?,?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
if (getRawType(returnType) != Call.class) {
return null;
}
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
@SuppressWarnings("unchecked")
CallAdapter<Object, Object> nextAdapter = (CallAdapter<Object, Object>) retrofit.nextCallAdapter(this, returnType, annotations);

final Type responseType = getCallResponseType(returnType);
return new CallAdapter<Object, Call<?>>() {
return new CallAdapter<Object, Object>() {
@Override
public Type responseType() {
return responseType;
return nextAdapter.responseType();
}

@Override
public Call<Object> adapt(Call<Object> call) {
return RetrofitCircuitBreaker.decorateCall(circuitBreaker, call, successResponse);
public Object adapt(Call<Object> call) {
return nextAdapter.adapt(RetrofitCircuitBreaker.decorateCall(circuitBreaker, call, successResponse));
}
};
}

private static Type getCallResponseType(Type returnType) {
if (!(returnType instanceof ParameterizedType)) {
throw new IllegalArgumentException("Call return type must be parameterized as Call<Foo> or Call<? extends Foo>");
}
return getParameterUpperBound(0, (ParameterizedType) returnType);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import retrofit2.Retrofit;

import java.lang.annotation.Annotation;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

/**
Expand All @@ -51,29 +50,19 @@ private RateLimiterCallAdapter(final RateLimiter rateLimiter) {

@Override
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
if (getRawType(returnType) != Call.class) {
return null;
}
@SuppressWarnings("unchecked")
CallAdapter<Object, Object> nextAdapter = (CallAdapter<Object, Object>) retrofit.nextCallAdapter(this, returnType, annotations);

final Type responseType = getCallResponseType(returnType);
return new CallAdapter<Object, Call<?>>() {
return new CallAdapter<Object, Object>() {
@Override
public Type responseType() {
return responseType;
return nextAdapter.responseType();
}

@Override
public Call<Object> adapt(Call<Object> call) {
return RetrofitRateLimiter.decorateCall(rateLimiter, call);
public Object adapt(Call<Object> call) {
return nextAdapter.adapt(RetrofitRateLimiter.decorateCall(rateLimiter, call));
}
};
}

private static Type getCallResponseType(Type returnType) {
if (!(returnType instanceof ParameterizedType)) {
throw new IllegalArgumentException("Call return type must be parameterized as Call<Foo> or Call<? extends Foo>");
}
return getParameterUpperBound(0, (ParameterizedType) returnType);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,59 +49,75 @@ public interface RetrofitCircuitBreaker {
* @return Original Call decorated with CircuitBreaker
*/
static <T> Call<T> decorateCall(final CircuitBreaker circuitBreaker, final Call<T> call, final Predicate<Response> responseSuccess) {
return new DecoratedCall<T>(call) {
return new CircuitBreakingCall<>(call, circuitBreaker, responseSuccess);
}

@Override
public void enqueue(final Callback<T> callback) {
try {
circuitBreaker.acquirePermission();
} catch (CallNotPermittedException cb) {
callback.onFailure(call, cb);
return;
}
class CircuitBreakingCall<T> extends DecoratedCall<T> {
private final Call<T> call;
private final CircuitBreaker circuitBreaker;
private final Predicate<Response> responseSuccess;

final StopWatch stopWatch = StopWatch.start();
call.enqueue(new Callback<T>() {
@Override
public void onResponse(final Call<T> call, final Response<T> response) {
if (responseSuccess.test(response)) {
circuitBreaker.onSuccess(stopWatch.stop().toNanos());
} else {
final Throwable throwable = new Throwable("Response error: HTTP " + response.code() + " - " + response.message());
circuitBreaker.onError(stopWatch.stop().toNanos(), throwable);
}
callback.onResponse(call, response);
}
public CircuitBreakingCall(Call<T> call, CircuitBreaker circuitBreaker, Predicate<Response> responseSuccess) {
super(call);
this.call = call;
this.circuitBreaker = circuitBreaker;
this.responseSuccess = responseSuccess;
}

@Override
public void onFailure(final Call<T> call, final Throwable t) {
circuitBreaker.onError(stopWatch.stop().toNanos(), t);
callback.onFailure(call, t);
}
});
}

@Override
public Response<T> execute() throws IOException {
@Override
public void enqueue(final Callback<T> callback) {
try {
circuitBreaker.acquirePermission();
final StopWatch stopWatch = StopWatch.start();
try {
final Response<T> response = call.execute();
} catch (CallNotPermittedException cb) {
callback.onFailure(call, cb);
return;
}

final StopWatch stopWatch = StopWatch.start();
call.enqueue(new Callback<T>() {
@Override
public void onResponse(final Call<T> call, final Response<T> response) {
if (responseSuccess.test(response)) {
circuitBreaker.onSuccess(stopWatch.stop().toNanos());
} else {
final Throwable throwable = new Throwable("Response error: HTTP " + response.code() + " - " + response.message());
circuitBreaker.onError(stopWatch.stop().toNanos(), throwable);
}
callback.onResponse(call, response);
}

@Override
public void onFailure(final Call<T> call, final Throwable t) {
circuitBreaker.onError(stopWatch.stop().toNanos(), t);
callback.onFailure(call, t);
}
});
}

@Override
public Response<T> execute() throws IOException {
circuitBreaker.acquirePermission();
final StopWatch stopWatch = StopWatch.start();
try {
final Response<T> response = call.execute();

return response;
} catch (Exception exception) {
circuitBreaker.onError(stopWatch.stop().toNanos(), exception);
throw exception;
if (responseSuccess.test(response)) {
circuitBreaker.onSuccess(stopWatch.stop().toNanos());
} else {
final Throwable throwable = new Throwable("Response error: HTTP " + response.code() + " - " + response.message());
circuitBreaker.onError(stopWatch.stop().toNanos(), throwable);
}

return response;
} catch (Exception exception) {
circuitBreaker.onError(stopWatch.stop().toNanos(), exception);
throw exception;
}
};
}
}

}
@Override
public Call<T> clone() {
return new CircuitBreakingCall<>(call.clone(), circuitBreaker, responseSuccess);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,42 +53,57 @@ public interface RetrofitRateLimiter {
* @return Original Call decorated with CircuitBreaker
*/
static <T> Call<T> decorateCall(final RateLimiter rateLimiter, final Call<T> call) {
return new DecoratedCall<T>(call) {
@Override
public void enqueue(final Callback<T> callback) {
try {
RateLimiter.waitForPermission(rateLimiter);
} catch (RequestNotPermitted | IllegalStateException e) {
callback.onResponse(call, tooManyRequestsError());
return;
}
return new RateLimitingCall<>(call, rateLimiter);
}

call.enqueue(callback);
}
class RateLimitingCall<T> extends DecoratedCall<T> {
private final Call<T> call;
private final RateLimiter rateLimiter;

@Override
public Response<T> execute() throws IOException {
CheckedFunction0<Response<T>> restrictedSupplier = RateLimiter.decorateCheckedSupplier(rateLimiter, call::execute);
final Try<Response<T>> response = Try.of(restrictedSupplier);
return response.isSuccess() ? response.get() : handleFailure(response);
}
public RateLimitingCall(Call<T> call, RateLimiter rateLimiter) {
super(call);
this.call = call;
this.rateLimiter = rateLimiter;
}

private Response<T> handleFailure(Try<Response<T>> response) throws IOException {
try {
throw response.getCause();
} catch (RequestNotPermitted | IllegalStateException e) {
return tooManyRequestsError();
} catch (IOException ioe) {
throw ioe;
} catch (Throwable t) {
throw new RuntimeException("Exception executing call", t);
}
@Override
public void enqueue(final Callback<T> callback) {
try {
RateLimiter.waitForPermission(rateLimiter);
} catch (RequestNotPermitted | IllegalStateException e) {
callback.onResponse(call, tooManyRequestsError());
return;
}

private Response<T> tooManyRequestsError() {
return Response.error(429, ResponseBody.create(MediaType.parse("text/plain"), "Too many requests for the client"));
call.enqueue(callback);
}

@Override
public Response<T> execute() throws IOException {
CheckedFunction0<Response<T>> restrictedSupplier = RateLimiter.decorateCheckedSupplier(rateLimiter, call::execute);
final Try<Response<T>> response = Try.of(restrictedSupplier);
return response.isSuccess() ? response.get() : handleFailure(response);
}

private Response<T> handleFailure(Try<Response<T>> response) throws IOException {
try {
throw response.getCause();
} catch (RequestNotPermitted | IllegalStateException e) {
return tooManyRequestsError();
} catch (IOException ioe) {
throw ioe;
} catch (Throwable t) {
throw new RuntimeException("Exception executing call", t);
}
};
}
}

}
private Response<T> tooManyRequestsError() {
return Response.error(429, ResponseBody.create(MediaType.parse("text/plain"), "Too many requests for the client"));
}

@Override
public Call<T> clone() {
return new RateLimitingCall<>(call.clone(), rateLimiter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
* @param <T> Call parameter type
*/
public class DecoratedCall<T> implements Call<T> {
public abstract class DecoratedCall<T> implements Call<T> {

private final Call<T> call;

Expand Down Expand Up @@ -65,9 +65,7 @@ public boolean isCanceled() {
}

@Override
public Call<T> clone() {
return new DecoratedCall<>(call.clone());
}
public abstract Call<T> clone();

@Override
public Request request() {
Expand Down
Loading

0 comments on commit 34d1204

Please sign in to comment.