Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into res…
Browse files Browse the repository at this point in the history
…ources-create
  • Loading branch information
anuchandy committed Aug 8, 2016
2 parents 652d622 + 2701ea4 commit 44a1519
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public <T, THeader> ServiceResponseWithHeaders<T, THeader> getPutOrPatchResultWi
* @param callback the user callback to call when operation terminates.
* @return the task describing the asynchronous polling.
*/
public <T> AsyncPollingTask<T> getPutOrPatchResultAsync(Response<ResponseBody> response, Type resourceType, ServiceCall serviceCall, ServiceCallback<T> callback) {
public <T> AsyncPollingTask<T> getPutOrPatchResultAsync(Response<ResponseBody> response, Type resourceType, ServiceCall<T> serviceCall, ServiceCallback<T> callback) {
if (response == null) {
callback.failure(new ServiceException("response is null."));
return null;
Expand Down Expand Up @@ -211,7 +211,7 @@ public <T> AsyncPollingTask<T> getPutOrPatchResultAsync(Response<ResponseBody> r
* @param callback the user callback to call when operation terminates.
* @return the task describing the asynchronous polling.
*/
public <T, THeader> AsyncPollingTask<T> getPutOrPatchResultWithHeadersAsync(Response<ResponseBody> response, Type resourceType, final Class<THeader> headerType, final ServiceCall serviceCall, final ServiceCallback<T> callback) {
public <T, THeader> AsyncPollingTask<T> getPutOrPatchResultWithHeadersAsync(Response<ResponseBody> response, Type resourceType, final Class<THeader> headerType, final ServiceCall<T> serviceCall, final ServiceCallback<T> callback) {
return this.getPutOrPatchResultAsync(response, resourceType, serviceCall, new ServiceCallback<T>() {
@Override
public void failure(Throwable t) {
Expand Down Expand Up @@ -329,7 +329,7 @@ public <T, THeader> ServiceResponseWithHeaders<T, THeader> getPostOrDeleteResult
* @param callback the user callback to call when operation terminates.
* @return the task describing the asynchronous polling.
*/
public <T> AsyncPollingTask<T> getPostOrDeleteResultAsync(Response<ResponseBody> response, Type resourceType, ServiceCall serviceCall, ServiceCallback<T> callback) {
public <T> AsyncPollingTask<T> getPostOrDeleteResultAsync(Response<ResponseBody> response, Type resourceType, ServiceCall<T> serviceCall, ServiceCallback<T> callback) {
if (response == null) {
callback.failure(new ServiceException("response is null."));
return null;
Expand Down Expand Up @@ -383,7 +383,7 @@ public <T> AsyncPollingTask<T> getPostOrDeleteResultAsync(Response<ResponseBody>
* @param callback the user callback to call when operation terminates.
* @return the task describing the asynchronous polling.
*/
public <T, THeader> AsyncPollingTask<T> getPostOrDeleteResultWithHeadersAsync(Response<ResponseBody> response, Type resourceType, final Class<THeader> headerType, final ServiceCall serviceCall, final ServiceCallback<T> callback) {
public <T, THeader> AsyncPollingTask<T> getPostOrDeleteResultWithHeadersAsync(Response<ResponseBody> response, Type resourceType, final Class<THeader> headerType, final ServiceCall<T> serviceCall, final ServiceCallback<T> callback) {
return this.getPostOrDeleteResultAsync(response, resourceType, serviceCall, new ServiceCallback<T>() {
@Override
public void failure(Throwable t) {
Expand Down Expand Up @@ -689,7 +689,7 @@ private Call<ResponseBody> pollAsync(String url, final ServiceCallback<ResponseB
}
AsyncService service = restClient().retrofit().create(AsyncService.class);
Call<ResponseBody> call = service.get(endpoint.getFile(), serviceClientUserAgent);
call.enqueue(new ServiceResponseCallback<ResponseBody>(callback) {
call.enqueue(new ServiceResponseCallback<ResponseBody>(null, callback) {
@Override
public void onResponse(Call<ResponseBody> call, Response<ResponseBody> response) {
try {
Expand Down Expand Up @@ -750,7 +750,7 @@ private interface AsyncService {
*/
abstract class AsyncPollingTask<T> implements Runnable {
/** The {@link Call} object from Retrofit. */
protected ServiceCall serviceCall;
protected ServiceCall<T> serviceCall;
/** The polling state for the current operation. */
protected PollingState<T> pollingState;
/** The callback used for asynchronous polling. */
Expand All @@ -776,7 +776,7 @@ class PutPatchPollingTask<T> extends AsyncPollingTask<T> {
* @param serviceCall the ServiceCall object tracking Retrofit calls.
* @param clientCallback the client callback to call when a terminal status is hit.
*/
PutPatchPollingTask(final PollingState<T> pollingState, final String url, final ServiceCall serviceCall, final ServiceCallback<T> clientCallback) {
PutPatchPollingTask(final PollingState<T> pollingState, final String url, final ServiceCall<T> serviceCall, final ServiceCallback<T> clientCallback) {
this.serviceCall = serviceCall;
this.pollingState = pollingState;
this.url = url;
Expand Down Expand Up @@ -833,14 +833,17 @@ class PostDeletePollingTask<T> extends AsyncPollingTask<T> {
* @param serviceCall the ServiceCall object tracking Retrofit calls.
* @param clientCallback the client callback to call when a terminal status is hit.
*/
PostDeletePollingTask(final PollingState<T> pollingState, final ServiceCall serviceCall, final ServiceCallback<T> clientCallback) {
PostDeletePollingTask(final PollingState<T> pollingState, final ServiceCall<T> serviceCall, final ServiceCallback<T> clientCallback) {
this.serviceCall = serviceCall;
this.pollingState = pollingState;
this.clientCallback = clientCallback;
this.pollingCallback = new ServiceCallback<T>() {
@Override
public void failure(Throwable t) {
clientCallback.failure(t);
if (clientCallback != null) {
clientCallback.failure(t);
}
serviceCall.failure(t);
}

@Override
Expand All @@ -861,14 +864,23 @@ public void run() {
&& !pollingState.getLocationHeaderLink().isEmpty()) {
updateStateFromLocationHeaderOnPostOrDeleteAsync(pollingState, pollingCallback);
} else {
pollingCallback.failure(new ServiceException("No header in response"));
ServiceException serviceException = new ServiceException("No async header in response");
pollingCallback.failure(serviceException);
}
} else {
// Check if operation failed
if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) {
clientCallback.failure(new ServiceException("Async operation failed"));
ServiceException serviceException = new ServiceException("Async operation failed");
if (clientCallback != null) {
clientCallback.failure(serviceException);
}
serviceCall.failure(serviceException);
} else {
clientCallback.success(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()));
ServiceResponse<T> serviceResponse = new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse());
if (clientCallback != null) {
clientCallback.success(serviceResponse);
}
serviceCall.success(serviceResponse);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void execute() throws Exception {
}

@Override
public ServiceCall executeAsync(final ServiceCallback<T> callback) {
public ServiceCall<T> executeAsync(final ServiceCallback<T> callback) {
executeReadyTasksAsync(callback);
return parallelServiceCall;
}
Expand Down Expand Up @@ -107,13 +107,17 @@ private ServiceCallback<T> taskCallback(final DAGNode<U> taskNode, final Service
@Override
public void failure(Throwable t) {
callback.failure(t);
parallelServiceCall.failure(t);
}

@Override
public void success(ServiceResponse<T> result) {
self.dag().reportedCompleted(taskNode);
if (self.dag().isRootNode(taskNode)) {
callback.success(result);
if (callback != null) {
callback.success(result);
}
parallelServiceCall.success(result);
} else {
self.executeReadyTasksAsync(callback);
}
Expand All @@ -124,8 +128,8 @@ public void success(ServiceResponse<T> result) {
/**
* Type represents a set of REST calls running possibly in parallel.
*/
private class ParallelServiceCall extends ServiceCall {
private ConcurrentLinkedQueue<ServiceCall> serviceCalls;
private class ParallelServiceCall extends ServiceCall<T> {
private ConcurrentLinkedQueue<ServiceCall<?>> serviceCalls;

/**
* Creates a ParallelServiceCall.
Expand All @@ -139,7 +143,7 @@ private class ParallelServiceCall extends ServiceCall {
* Cancels all the service calls currently executing.
*/
public void cancel() {
for (ServiceCall call : this.serviceCalls) {
for (ServiceCall<?> call : this.serviceCalls) {
call.cancel();
}
}
Expand All @@ -148,7 +152,7 @@ public void cancel() {
* @return true if the call has been canceled; false otherwise.
*/
public boolean isCancelled() {
for (ServiceCall call : this.serviceCalls) {
for (ServiceCall<?> call : this.serviceCalls) {
if (!call.isCanceled()) {
return false;
}
Expand Down
28 changes: 27 additions & 1 deletion client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

package com.microsoft.rest;

import com.google.common.util.concurrent.AbstractFuture;

import retrofit2.Call;

/**
* An instance of this class provides access to the underlying REST call invocation.
* This class wraps around the Retrofit Call object and allows updates to it in the
* progress of a long running operation or a paging operation.
*
* @param <T> the type of the returning object
*/
public class ServiceCall {
public class ServiceCall<T> extends AbstractFuture<ServiceResponse<T>> {
/**
* The Retrofit method invocation.
*/
Expand Down Expand Up @@ -62,4 +66,26 @@ public void cancel() {
public boolean isCanceled() {
return call.isCanceled();
}

/**
* Invoke this method to report completed, allowing
* {@link AbstractFuture#get()} to be unblocked.
*
* @param result the service response returned.
* @return true if successfully reported; false otherwise.
*/
public boolean success(ServiceResponse<T> result) {
return set(result);
}

/**
* Invoke this method to report a failure, allowing
* {@link AbstractFuture#get()} to throw the exception.
*
* @param t the exception thrown.
* @return true if successfully reported; false otherwise.
*/
public boolean failure(Throwable t) {
return setException(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
* @param <T> the response body type
*/
public abstract class ServiceResponseCallback<T> implements Callback<ResponseBody> {
/**
* The client service call object.
*/
private ServiceCall<T> serviceCall;

/**
* The client callback.
*/
Expand All @@ -26,14 +31,21 @@ public abstract class ServiceResponseCallback<T> implements Callback<ResponseBod
/**
* Creates an instance of ServiceResponseCallback.
*
* @param serviceCall the client service call to call on a terminal state.
* @param serviceCallback the client callback to call on a terminal state.
*/
public ServiceResponseCallback(ServiceCallback<T> serviceCallback) {
public ServiceResponseCallback(ServiceCall<T> serviceCall, ServiceCallback<T> serviceCallback) {
this.serviceCall = serviceCall;
this.serviceCallback = serviceCallback;
}

@Override
public void onFailure(Call<ResponseBody> call, Throwable t) {
serviceCallback.failure(new ServiceException(t));
if (serviceCallback != null) {
serviceCallback.failure(t);
}
if (serviceCall != null) {
serviceCall.failure(t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
* @param <T> the response body type
*/
public abstract class ServiceResponseEmptyCallback<T> implements Callback<Void> {
/**
* The client service call object.
*/
private ServiceCall<T> serviceCall;

/**
* The client callback.
*/
Expand All @@ -25,14 +30,21 @@ public abstract class ServiceResponseEmptyCallback<T> implements Callback<Void>
/**
* Creates an instance of ServiceResponseCallback.
*
* @param serviceCall the client service call to call on a terminal state.
* @param serviceCallback the client callback to call on a terminal state.
*/
public ServiceResponseEmptyCallback(ServiceCallback<T> serviceCallback) {
public ServiceResponseEmptyCallback(ServiceCall<T> serviceCall, ServiceCallback<T> serviceCallback) {
this.serviceCall = serviceCall;
this.serviceCallback = serviceCallback;
}

@Override
public void onFailure(Call<Void> call, Throwable t) {
serviceCallback.failure(new ServiceException(t));
if (serviceCallback != null) {
serviceCallback.failure(t);
}
if (serviceCall != null) {
serviceCall.failure(t);
}
}
}

0 comments on commit 44a1519

Please sign in to comment.