Skip to content

Commit

Permalink
Create VM works
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 12, 2016
1 parent 16c5acd commit 0a7e455
Showing 1 changed file with 11 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,11 @@

package com.microsoft.azure;

import org.apache.commons.lang3.tuple.MutablePair;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* The base implementation of TaskGroup interface.
Expand Down Expand Up @@ -88,36 +81,23 @@ public T taskResult(String taskId) {
*/
private Observable<T> executeReadyTasksAsync() {
DAGNode<U> nextNode = dag.getNext();
Observable<T> rootObservable = null;
final List<Observable<T>> observables = new ArrayList<>();
while (nextNode != null) {
final DAGNode<U> thisNode = nextNode;
if (dag().isRootNode(nextNode)) {
rootObservable = nextNode.data().executeAsync()
.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
dag().reportedCompleted(thisNode);
}
});
} else {
Observable<T> nextNodeObservable = nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
observables.add(nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
return executeReadyTasksAsync();
}
});
observables.add(nextNodeObservable);
}
}
}));
nextNode = dag.getNext();
}
if (rootObservable != null) {
return rootObservable;
}
else {
return Observable.merge(observables).last();
}
return Observable.merge(observables).last();
}
}

0 comments on commit 0a7e455

Please sign in to comment.