Skip to content

Commit

Permalink
Merge pull request Azure#53 from Azure/sdk_1060
Browse files Browse the repository at this point in the history
[Automatic PR] SDK changes from pull request Azure#1060
  • Loading branch information
jianghaolu authored Sep 13, 2016
2 parents 524db4a + 626b093 commit a29b72b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceResponse;
import com.microsoft.rest.ServiceResponseWithHeaders;

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

import java.util.List;

/**
* An instance of this class provides access to the underlying REST call invocation.
* This class wraps around the Retrofit Call object and allows updates to it in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,31 @@ public Observable<T> executeAsync() {
final List<Observable<T>> observables = new ArrayList<>();
while (nextNode != null) {
final DAGNode<U> thisNode = nextNode;
observables.add(nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
T cachedResult = nextNode.data().result();
if (cachedResult != null && !this.dag().isRootNode(nextNode)) {
observables.add(Observable.just(cachedResult)
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
return executeAsync();
}
}
}));
})
);
} else {
observables.add(nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
return executeAsync();
}
}
}));
}
nextNode = dag.getNext();
}
return Observable.merge(observables);
Expand Down
12 changes: 11 additions & 1 deletion client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package com.microsoft.rest;

import com.google.common.util.concurrent.AbstractFuture;

import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
Expand Down Expand Up @@ -130,6 +129,17 @@ protected void setSubscription(Subscription subscription) {
this.subscription = subscription;
}

/**
* Invoke this method to report completed, allowing
* {@link AbstractFuture#get()} to be unblocked.
*
* @param result the service response returned.
* @return true if successfully reported; false otherwise.
*/
public boolean success(T result) {
return set(result);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
subscription.unsubscribe();
Expand Down

0 comments on commit a29b72b

Please sign in to comment.