Skip to content

Commit

Permalink
Merge pull request Azure#252 from Azure/RefactorRestProxy
Browse files Browse the repository at this point in the history
Make RestProxy sync scenarios go through async scenarios (one code path)
  • Loading branch information
Dan Schulte authored Oct 4, 2017
2 parents 4a1f9a2 + 449ff5d commit 77b328a
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ else if (pollingSucceeded) {
return new HttpRequest(fullyQualifiedMethodName, "GET", pollUrl);
}

@Override
public void updateFrom(HttpResponse httpPollResponse) throws IOException {
updateFromAsync(httpPollResponse).toBlocking().value();
}

@Override
public Single<HttpResponse> updateFromAsync(final HttpResponse httpPollResponse) {
updateDelayInMillisecondsFrom(httpPollResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import com.microsoft.rest.http.HttpClient;
import com.microsoft.rest.http.HttpRequest;
import com.microsoft.rest.http.HttpResponse;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func0;
import rx.functions.Func1;

import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -104,62 +102,14 @@ public static <A> A create(Class<A> swaggerInterface, String baseUrl, final Http
}

@Override
protected Object handleSyncHttpResponse(HttpRequest httpRequest, HttpResponse httpResponse, SwaggerMethodParser methodParser) throws IOException, InterruptedException {
protected Object handleAsyncHttpResponse(final HttpRequest httpRequest, Single<HttpResponse> asyncHttpResponse, final SwaggerMethodParser methodParser, final Type returnType) {
final SerializerAdapter<?> serializer = serializer();

final PollStrategy pollStrategy = createPollStrategy(httpRequest, httpResponse, serializer);
if (pollStrategy != null) {
while (!pollStrategy.isDone()) {
pollStrategy.delay();
Object result;

final HttpRequest pollRequest = pollStrategy.createPollRequest();
httpResponse = sendHttpRequest(pollRequest);

pollStrategy.updateFrom(httpResponse);
}
}

return super.handleSyncHttpResponse(httpRequest, httpResponse, methodParser);
}

@Override
protected Object handleAsyncHttpResponse(final HttpRequest httpRequest, Single<HttpResponse> asyncHttpResponse, final SwaggerMethodParser methodParser) {
final SerializerAdapter<?> serializer = serializer();

Object result = null;

final Type returnType = methodParser.returnType();
final TypeToken returnTypeToken = TypeToken.of(returnType);

if (returnTypeToken.isSubtypeOf(Completable.class) || returnTypeToken.isSubtypeOf(Single.class)) {
asyncHttpResponse = asyncHttpResponse
.flatMap(new Func1<HttpResponse, Single<? extends HttpResponse>>() {
@Override
public Single<? extends HttpResponse> call(HttpResponse httpResponse) {
final PollStrategy pollStrategy = createPollStrategy(httpRequest, httpResponse, serializer);

Single<HttpResponse> result;
if (pollStrategy == null || pollStrategy.isDone()) {
result = Single.just(httpResponse);
}
else {
result = sendPollRequestWithDelay(pollStrategy)
.repeat()
.takeUntil(new Func1<HttpResponse, Boolean>() {
@Override
public Boolean call(HttpResponse ignored) {
return pollStrategy.isDone();
}
})
.last()
.toSingle();
}
return result;
}
});
result = super.handleAsyncHttpResponse(httpRequest, asyncHttpResponse, methodParser);
}
else if (returnTypeToken.isSubtypeOf(Observable.class)) {
if (returnTypeToken.isSubtypeOf(Observable.class)) {
final Type operationStatusType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
final TypeToken operationStatusTypeToken = TypeToken.of(operationStatusType);
if (!operationStatusTypeToken.isSubtypeOf(OperationStatus.class)) {
Expand Down Expand Up @@ -206,6 +156,34 @@ public Boolean call(OperationStatus<Object> operationStatus) {
});
}
}
else {
asyncHttpResponse = asyncHttpResponse
.flatMap(new Func1<HttpResponse, Single<? extends HttpResponse>>() {
@Override
public Single<? extends HttpResponse> call(HttpResponse httpResponse) {
final PollStrategy pollStrategy = createPollStrategy(httpRequest, httpResponse, serializer);

Single<HttpResponse> result;
if (pollStrategy == null || pollStrategy.isDone()) {
result = Single.just(httpResponse);
}
else {
result = sendPollRequestWithDelay(pollStrategy)
.repeat()
.takeUntil(new Func1<HttpResponse, Boolean>() {
@Override
public Boolean call(HttpResponse ignored) {
return pollStrategy.isDone();
}
})
.last()
.toSingle();
}
return result;
}
});
result = super.handleAsyncHttpResponse(httpRequest, asyncHttpResponse, methodParser, returnType);
}

return result;
}
Expand Down Expand Up @@ -245,15 +223,13 @@ private static PollStrategy createPollStrategy(HttpRequest httpRequest, HttpResp
private Observable<OperationStatus<Object>> toCompletedOperationStatusObservable(String provisioningState, HttpRequest httpRequest, HttpResponse httpResponse, SwaggerMethodParser methodParser, Type operationStatusResultType) {
Observable<OperationStatus<Object>> result;
try {
final Object resultObject = super.handleSyncHttpResponse(httpRequest, httpResponse, methodParser, operationStatusResultType);
final Object resultObject = super.handleAsyncHttpResponse(httpRequest, Single.just(httpResponse), methodParser, operationStatusResultType);
result = Observable.just(new OperationStatus<>(resultObject, provisioningState));
} catch (RestException e) {
if (ProvisioningState.SUCCEEDED.equals(provisioningState)) {
provisioningState = ProvisioningState.FAILED;
}
result = Observable.just(new OperationStatus<>(e, provisioningState));
} catch (IOException e) {
result = Observable.error(e);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import com.microsoft.rest.http.HttpResponse;
import rx.Single;

import java.io.IOException;

/**
* A PollStrategy type that uses the Location header value to check the status of a long running
* operation.
Expand Down Expand Up @@ -40,7 +38,7 @@ public HttpRequest createPollRequest() {
}

@Override
public void updateFrom(HttpResponse httpPollResponse) throws IOException {
public Single<HttpResponse> updateFromAsync(HttpResponse httpPollResponse) {
final int httpStatusCode = httpPollResponse.statusCode();
if (httpStatusCode == 202) {
locationUrl = httpPollResponse.headerValue(HEADER_NAME);
Expand All @@ -49,18 +47,7 @@ public void updateFrom(HttpResponse httpPollResponse) throws IOException {
else {
done = true;
}
}

@Override
public Single<HttpResponse> updateFromAsync(HttpResponse httpPollResponse) {
Single<HttpResponse> result;
try {
updateFrom(httpPollResponse);
result = Single.just(httpPollResponse);
} catch (IOException e) {
result = Single.error(e);
}
return result;
return Single.just(httpPollResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.microsoft.rest.http.HttpResponse;
import rx.Single;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -25,14 +24,6 @@ abstract class PollStrategy {
this.delayInMilliseconds = delayInMilliseconds;
}

/**
* Get the number of milliseconds to delay before sending the next poll request.
* @return The number of milliseconds to delay.
*/
final long delayInMilliseconds() {
return delayInMilliseconds;
}

/**
* Set the delay in milliseconds to 0.
*/
Expand All @@ -55,17 +46,6 @@ final void updateDelayInMillisecondsFrom(HttpResponse httpPollResponse) {
}
}

/**
* If this PollStrategy has a retryAfterSeconds value, delay (and block) the current thread for
* the number of seconds that are in the retryAfterSeconds value. If this PollStrategy doesn't
* have a retryAfterSeconds value, then just return.
*/
void delay() throws InterruptedException {
if (delayInMilliseconds > 0) {
Thread.sleep(delayInMilliseconds);
}
}

/**
* If this OperationStatus has a retryAfterSeconds value, return an Single that is delayed by the
* number of seconds that are in the retryAfterSeconds value. If this OperationStatus doesn't have
Expand Down Expand Up @@ -103,12 +83,6 @@ protected void setProvisioningState(String provisioningState) {
*/
abstract HttpRequest createPollRequest();

/**
* Update the status of this PollStrategy from the provided HTTP poll response.
* @param httpPollResponse The response of the most recent poll request.
*/
abstract void updateFrom(HttpResponse httpPollResponse) throws IOException;

/**
* Update the status of this PollStrategy from the provided HTTP poll response.
* @param httpPollResponse The response of the most recent poll request.
Expand Down
73 changes: 29 additions & 44 deletions client-runtime/src/main/java/com/microsoft/rest/RestProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.microsoft.rest.http.HttpResponse;
import com.microsoft.rest.http.UrlBuilder;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
Expand Down Expand Up @@ -71,16 +72,6 @@ protected SerializerAdapter<?> serializer() {
return serializer;
}

/**
* Send the provided request and block until the response is received.
* @param request The HTTP request to send.
* @return The HTTP response received.
* @throws IOException On network issues.
*/
public HttpResponse sendHttpRequest(HttpRequest request) throws IOException {
return httpClient.sendRequest(request);
}

/**
* Send the provided request asynchronously, applying any request policies provided to the HttpClient instance.
* @param request The HTTP request to send.
Expand All @@ -96,17 +87,9 @@ public Object invoke(Object proxy, final Method method, Object[] args) throws IO

final HttpRequest request = createHttpRequest(methodParser, args);

Object result;
if (methodParser.isAsync()) {
final Single<HttpResponse> asyncResponse = sendHttpRequestAsync(request);
result = handleAsyncHttpResponse(request, asyncResponse, methodParser);
}
else {
final HttpResponse response = sendHttpRequest(request);
result = handleSyncHttpResponse(request, response, methodParser);
}
final Single<HttpResponse> asyncResponse = sendHttpRequestAsync(request);

return result;
return handleAsyncHttpResponse(request, asyncResponse, methodParser, methodParser.returnType());
}

/**
Expand Down Expand Up @@ -181,16 +164,6 @@ else if (bodyContentObject instanceof String) {
return request;
}

protected Object handleSyncHttpResponse(HttpRequest httpRequest, HttpResponse httpResponse, SwaggerMethodParser methodParser) throws IOException, InterruptedException {
final Type returnType = methodParser.returnType();
return handleSyncHttpResponse(httpRequest, httpResponse, methodParser, returnType);
}

protected Object handleSyncHttpResponse(HttpRequest httpRequest, HttpResponse httpResponse, SwaggerMethodParser methodParser, Type returnType) throws IOException {
ensureExpectedStatus(httpResponse, methodParser).toBlocking().value();
return toProxyReturnValue(httpResponse, methodParser, returnType).toBlocking().value();
}

private Exception instantiateUnexpectedException(SwaggerMethodParser methodParser, HttpResponse response, String responseContent) {
final int responseStatusCode = response.statusCode();
final Class<? extends RestException> exceptionType = methodParser.exceptionType();
Expand Down Expand Up @@ -226,18 +199,18 @@ public HttpResponse call(String responseBody) {
}
});
} else {
asyncResult = Single.just(response);
asyncResult = Single.just(response);
}

return asyncResult;
}

private Single<?> toProxyReturnValue(HttpResponse response, SwaggerMethodParser methodParser, final Type entityType) {
private Single<?> toProxyReturnValueAsync(HttpResponse response, String httpMethod, final Type entityType) {
final TypeToken entityTypeToken = TypeToken.of(entityType);
final Single<?> asyncResult;
if (entityTypeToken.isSubtypeOf(void.class) || entityTypeToken.isSubtypeOf(Void.class)) {
asyncResult = Single.just(null);
} else if (methodParser.httpMethod().equalsIgnoreCase("HEAD")
} else if (httpMethod.equalsIgnoreCase("HEAD")
&& (entityTypeToken.isSubtypeOf(boolean.class) || entityTypeToken.isSubtypeOf(Boolean.class))) {
boolean isSuccess = response.statusCode() / 100 == 2;
asyncResult = Single.just(isSuccess);
Expand Down Expand Up @@ -265,34 +238,46 @@ public Single<Object> call(String responseBodyString) {
return asyncResult;
}

protected Object handleAsyncHttpResponse(HttpRequest httpRequest, Single<HttpResponse> asyncHttpResponse, final SwaggerMethodParser methodParser) {
protected Object handleAsyncHttpResponse(HttpRequest httpRequest, Single<HttpResponse> asyncHttpResponse, final SwaggerMethodParser methodParser, final Type returnType) {
Object result;

final Type returnType = methodParser.returnType();
final TypeToken returnTypeToken = TypeToken.of(returnType);

final Single<HttpResponse> asyncExpectedResponse = asyncHttpResponse.flatMap(new Func1<HttpResponse, Single<HttpResponse>>() {
@Override
public Single<HttpResponse> call(HttpResponse response) {
return ensureExpectedStatus(response, methodParser);
}
});
final Single<HttpResponse> asyncExpectedResponse = asyncHttpResponse
.flatMap(new Func1<HttpResponse, Single<HttpResponse>>() {
@Override
public Single<HttpResponse> call(HttpResponse response) {
return ensureExpectedStatus(response, methodParser);
}
});

if (returnTypeToken.isSubtypeOf(Completable.class)) {
result = Completable.fromSingle(asyncExpectedResponse);
}
else if (returnTypeToken.isSubtypeOf(Single.class)) {
final Type singleTypeParam = ((ParameterizedType) methodParser.returnType()).getActualTypeArguments()[0];
final Type singleTypeParam = ((ParameterizedType) returnType).getActualTypeArguments()[0];
result = asyncExpectedResponse.flatMap(new Func1<HttpResponse, Single<?>>() {
@Override
public Single<?> call(HttpResponse response) {
return toProxyReturnValue(response, methodParser, singleTypeParam);
return toProxyReturnValueAsync(response, methodParser.httpMethod(), singleTypeParam);
}
});
}
else {
else if (returnTypeToken.isSubtypeOf(Observable.class)) {
throw new InvalidReturnTypeException("RestProxy does not support swagger interface methods (such as " + methodParser.fullyQualifiedMethodName() + "()) with a return type of " + returnType.toString());
}
else {
// The return value is not an asynchronous type (Completable, Single, or Observable), so
// block the deserialization until a value is received.
result = asyncExpectedResponse
.flatMap(new Func1<HttpResponse, Single<?>>() {
@Override
public Single<?> call(HttpResponse httpResponse) {
return toProxyReturnValueAsync(httpResponse, methodParser.httpMethod(), returnType);
}
})
.toBlocking().value();
}

return result;
}
Expand Down
Loading

0 comments on commit 77b328a

Please sign in to comment.