Skip to content

Commit

Permalink
Paging async works
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 25, 2016
1 parent 148e463 commit a1cd0f1
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@

import com.microsoft.rest.ServiceResponse;
import com.microsoft.rest.ServiceResponseWithHeaders;
import okhttp3.ResponseBody;
import retrofit2.Response;
import retrofit2.http.GET;
import retrofit2.http.Header;
import retrofit2.http.Url;
import rx.Observable;
import rx.Statement;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;

import java.io.IOException;
import java.lang.reflect.Type;
Expand All @@ -29,6 +19,14 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import okhttp3.ResponseBody;
import retrofit2.Response;
import retrofit2.http.GET;
import retrofit2.http.Header;
import retrofit2.http.Url;
import rx.Observable;
import rx.functions.Func1;

/**
* An instance of this class defines a ServiceClient that handles polling and
* retrying for long running operations when accessing Azure resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

/**
Expand All @@ -22,48 +21,75 @@
*
* @param <T> the type of the returning object
*/
public class AzureServiceCall<T> extends ServiceCall<T> {
public final class AzureServiceCall<T> extends ServiceCall<T> {
private AzureServiceCall() {
}

/**
* Creates a ServiceCall from a paging operation.
*
* @param first the observable to the first page
* @param next the observable to poll subsequent pages
* @param callback the client-side callback
* @param <T> the Page type
* @param <V> the element type
* @return the future based ServiceCall
*/
public static <T extends Page<V>, V> ServiceCall<T> create(Observable<ServiceResponse<T>> first, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
final AzureServiceCall<T> serviceCall = new AzureServiceCall<>();
final Subscriber<ServiceResponse<T>> subscriber = new Subscriber<ServiceResponse<T>>() {
private ServiceResponse<T> lastResponse;
final PagingSubscriber<T, V> subscriber = new PagingSubscriber<>(serviceCall, next, callback);
serviceCall.setSubscription(first
.single()
.subscribe(subscriber));
return serviceCall;
}

@Override
public void onCompleted() {
// do nothing
}
/**
* The subscriber that handles user callback and automatically subscribes to the next page.
*
* @param <T> the Page type
* @param <V> the element type
*/
private static class PagingSubscriber<T extends Page<V>, V> extends Subscriber<ServiceResponse<T>> {
private AzureServiceCall<T> serviceCall;
private Func1<String, Observable<ServiceResponse<T>>> next;
private ListOperationCallback<V> callback;
private ServiceResponse<T> lastResponse;

@Override
public void onError(Throwable e) {
serviceCall.setException(e);
if (callback != null) {
callback.failure(e);
}
PagingSubscriber(final AzureServiceCall<T> serviceCall, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
this.serviceCall = serviceCall;
this.next = next;
this.callback = callback;
}

@Override
public void onCompleted() {
// do nothing
}

@Override
public void onError(Throwable e) {
serviceCall.setException(e);
if (callback != null) {
callback.failure(e);
}
}

@Override
public void onNext(ServiceResponse<T> serviceResponse) {
lastResponse = serviceResponse;
ListOperationCallback.PagingBehavior behavior = ListOperationCallback.PagingBehavior.CONTINUE;
if (callback != null) {
behavior = callback.progress(serviceResponse.getBody().getItems());
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
callback.success();
}
}
@Override
public void onNext(ServiceResponse<T> serviceResponse) {
lastResponse = serviceResponse;
ListOperationCallback.PagingBehavior behavior = ListOperationCallback.PagingBehavior.CONTINUE;
if (callback != null) {
behavior = callback.progress(serviceResponse.getBody().getItems());
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
serviceCall.set(lastResponse);
} else {
serviceCall.setSubscription(next.call(serviceResponse.getBody().getNextPageLink()).single().subscribe(subscriber));
callback.success();
}
}
};
serviceCall.setSubscription(first
.single()
.subscribe(subscriber));
return serviceCall;
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
serviceCall.set(lastResponse);
} else {
serviceCall.setSubscription(next.call(serviceResponse.getBody().getNextPageLink()).single().subscribe(this));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public void success(ServiceResponse<List<E>> result) {
success();
}

/**
* Override this method to handle successful REST call results.
*/
public abstract void success();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import com.microsoft.rest.RestException;

import javax.xml.bind.DataBindingException;
import javax.xml.ws.WebServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -19,6 +17,8 @@
import java.util.ListIterator;
import java.util.NoSuchElementException;

import javax.xml.bind.DataBindingException;

/**
* Defines a list response from a paging operation. The pages are
* lazy initialized when an instance of this class is iterated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public Subscription getSubscription() {
return subscription;
}

protected void setSubscription(Subscription subscription) {
public void setSubscription(Subscription subscription) {
this.subscription = subscription;
}

Expand Down

0 comments on commit a1cd0f1

Please sign in to comment.