Skip to content

Commit

Permalink
Simplify the parallel async logic and adding circular depedency detec…
Browse files Browse the repository at this point in the history
…tion logic
  • Loading branch information
anuchandy committed Jul 24, 2016
1 parent b6d1b5e commit 0493b5e
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,8 @@ public void reportedCompleted(U completed) {
*/
private void initializeDependentKeys() {
visit(new Visitor<U>() {
// This 'visit' will be called only once per each node.
@Override
public void visit(U node) {
public void visitNode(U node) {
if (node.dependencyKeys().isEmpty()) {
return;
}
Expand All @@ -166,6 +165,14 @@ public void visit(U node) {
.addDependent(dependentKey);
}
}

@Override
public void visitEdge(String fromKey, String toKey, GraphEdgeType edgeType) {
System.out.println("{" + fromKey + ", " + toKey + "} " + edgeType);
if (edgeType == GraphEdgeType.BACK) {
throw new IllegalStateException("Detected circular dependency: " + findPath(fromKey, toKey));
}
}
});
}

Expand Down
89 changes: 82 additions & 7 deletions azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
import java.util.Map;
import java.util.Set;

/**
* The edge types in a graph.
*/
enum GraphEdgeType {
TREE,
FORWARD,
BACK,
CROSS
}

/**
* Type representing a directed graph data structure.
* <p>
Expand All @@ -23,13 +33,23 @@
public class Graph<T, U extends Node<T>> {
protected Map<String, U> graph;
private Set<String> visited;
private Integer time;
private Map<String, Integer> entryTime;
private Map<String, Integer> exitTime;
private Map<String, String> parent;
private Set<String> processed;

/**
* Creates a directed graph.
*/
public Graph() {
this.graph = new HashMap<>();
this.visited = new HashSet<>();
this.time = 0;
this.entryTime = new HashMap<>();
this.exitTime = new HashMap<>();
this.parent = new HashMap<>();
this.processed = new HashSet<>();
}

/**
Expand All @@ -53,14 +73,23 @@ interface Visitor<U> {
*
* @param node the node to visited
*/
void visit(U node);
void visitNode(U node);

/**
* visit an edge.
*
* @param fromKey key of the from node
* @param toKey key of the to node
* @param graphEdgeType the edge type
*/
void visitEdge(String fromKey, String toKey, GraphEdgeType graphEdgeType);
}

/**
* Perform DFS visit in this graph.
* <p>
* The directed graph will be traversed in DFS order and the visitor will be notified as
* search explores each node
* search explores each node and edge.
*
* @param visitor the graph visitor
*/
Expand All @@ -71,15 +100,61 @@ public void visit(Visitor visitor) {
}
}
visited.clear();
time = 0;
entryTime.clear();
exitTime.clear();
parent.clear();
processed.clear();
}

private void dfs(Visitor visitor, Node<T> node) {
visitor.visit(node);
visited.add(node.key());
for (String childKey : node.children()) {
if (!visited.contains(childKey)) {
this.dfs(visitor, this.graph.get(childKey));
visitor.visitNode(node);

String fromKey = node.key();
visited.add(fromKey);
time++;
entryTime.put(fromKey, time);
for (String toKey : node.children()) {
if (!visited.contains(toKey)) {
parent.put(toKey, fromKey);
visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey));
this.dfs(visitor, this.graph.get(toKey));
} else {
visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey));
}
}
time++;
exitTime.put(fromKey, time);
processed.add(fromKey);
}

private GraphEdgeType edgeType(String fromKey, String toKey) {
if (parent.containsKey(toKey) && parent.get(toKey).equals(fromKey)) {
return GraphEdgeType.TREE;
}

if (visited.contains(toKey) && !processed.contains(toKey)) {
return GraphEdgeType.BACK;
}

if (processed.contains(toKey) && entryTime.containsKey(toKey) && entryTime.containsKey(fromKey)) {
if (entryTime.get(toKey) > entryTime.get(fromKey)) {
return GraphEdgeType.FORWARD;
}

if (entryTime.get(toKey) < entryTime.get(fromKey)) {
return GraphEdgeType.CROSS;
}
}

throw new IllegalStateException("Internal Error: Unable to locate the edge type {" + fromKey + ", " + toKey + "}");
}

protected String findPath(String start, String end) {
if (start.equals(end)) {
return start;
} else {
return findPath(start, parent.get(end)) + " -> " + end;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,10 @@

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import com.microsoft.rest.ServiceResponse;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
/**
* An instance of this class provides access to the underlying REST service call running
* in parallel.
*
* @param <T>
*/
class ParallelServiceCall<T> extends ServiceCall {
private TaskGroupBase<T> taskGroup;

/**
* Creates a ParallelServiceCall.
*
* @param taskGroup the task group
*/
ParallelServiceCall(TaskGroupBase<T> taskGroup) {
super(null);
this.taskGroup = taskGroup;
}

/**
* Cancels all the service calls currently executing.
*/
public void cancel() {
for (ServiceCall call : this.taskGroup.calls()) {
call.cancel();
}
}

/**
* @return true if the call has been canceled; false otherwise.
*/
public boolean isCancelled() {
for (ServiceCall call : this.taskGroup.calls()) {
if (!call.isCanceled()) {
return false;
}
}
return true;
}
}

/**
* The base implementation of TaskGroup interface.
*
Expand All @@ -65,8 +21,7 @@ public boolean isCancelled() {
public abstract class TaskGroupBase<T>
implements TaskGroup<T, TaskItem<T>> {
private DAGraph<TaskItem<T>, DAGNode<TaskItem<T>>> dag;
private ConcurrentLinkedQueue<ServiceCall> serviceCalls = new ConcurrentLinkedQueue<>();
private ParallelServiceCall<T> parallelServiceCall;
private ParallelServiceCall parallelServiceCall;

/**
* Creates TaskGroupBase.
Expand All @@ -76,11 +31,7 @@ public abstract class TaskGroupBase<T>
*/
public TaskGroupBase(String rootTaskItemId, TaskItem<T> rootTaskItem) {
this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem));
this.parallelServiceCall = new ParallelServiceCall<>(this);
}

List<ServiceCall> calls() {
return Collections.unmodifiableList(Arrays.asList(serviceCalls.toArray(new ServiceCall[0])));
this.parallelServiceCall = new ParallelServiceCall();
}

@Override
Expand All @@ -93,14 +44,6 @@ public boolean isPreparer() {
return dag.isPreparer();
}

/**
* @return Gets the ParallelServiceCall instance that wraps the service calls running
* in parallel.
*/
public ParallelServiceCall<T> parallelServiceCall() {
return this.parallelServiceCall;
}

@Override
public void merge(TaskGroup<T, TaskItem<T>> parentTaskGroup) {
dag.merge(parentTaskGroup.dag());
Expand All @@ -116,32 +59,109 @@ public void prepare() {
@Override
public void execute() throws Exception {
DAGNode<TaskItem<T>> nextNode = dag.getNext();
if (nextNode == null) {
return;
while (nextNode != null) {
nextNode.data().execute();
this.dag().reportedCompleted(nextNode);
nextNode = dag.getNext();
}

nextNode.data().execute(this, nextNode);
}

@Override
public ServiceCall executeAsync(final ServiceCallback<T> callback) {
ServiceCall serviceCall = null;
executeReadyTasksAsync(callback);
return parallelServiceCall;
}

@Override
public T taskResult(String taskId) {
return dag.getNodeData(taskId).result();
}

/**
* Executes all runnable tasks, a task is runnable when all the tasks its depends
* on are finished running.
*
* @param callback the callback
*/
private void executeReadyTasksAsync(final ServiceCallback<T> callback) {
DAGNode<TaskItem<T>> nextNode = dag.getNext();
while (nextNode != null) {
serviceCall = nextNode.data().executeAsync(this, nextNode, dag.isRootNode(nextNode), callback);
if (serviceCall != null) {
// Filter out the null value returned by executeAsync. that happen
// when TaskItem::executeAsync invokes TaskGroupBase::executeAsync
// but there is no task available in the queue at the moment.
this.serviceCalls.add(serviceCall);
}
ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback));
this.parallelServiceCall.addCall(serviceCall);
nextNode = dag.getNext();
}
return serviceCall;
}

@Override
public T taskResult(String taskId) {
return dag.getNodeData(taskId).result();
/**
* This method create and return a callback for the runnable task stored in the given node.
* This callback wraps the given callback.
*
* @param taskNode the node containing runnable task
* @param callback the callback to wrap
* @return the task callback
*/
private ServiceCallback<T> taskCallback(final DAGNode<TaskItem<T>> taskNode, final ServiceCallback<T> callback) {
final TaskGroupBase<T> self = this;
return new ServiceCallback<T>() {
@Override
public void failure(Throwable t) {
callback.failure(t);
}

@Override
public void success(ServiceResponse<T> result) {
self.dag().reportedCompleted(taskNode);
if (self.dag().isRootNode(taskNode)) {
callback.success(result);
} else {
self.executeReadyTasksAsync(callback);
}
}
};
}

/**
* Type represents a set of REST calls running possibly in parallel.
*/
private class ParallelServiceCall extends ServiceCall {
private ConcurrentLinkedQueue<ServiceCall> serviceCalls;

/**
* Creates a ParallelServiceCall.
*/
ParallelServiceCall() {
super(null);
this.serviceCalls = new ConcurrentLinkedQueue<>();
}

/**
* Cancels all the service calls currently executing.
*/
public void cancel() {
for (ServiceCall call : this.serviceCalls) {
call.cancel();
}
}

/**
* @return true if the call has been canceled; false otherwise.
*/
public boolean isCancelled() {
for (ServiceCall call : this.serviceCalls) {
if (!call.isCanceled()) {
return false;
}
}
return true;
}

/**
* Add a call to the list of parallel calls.
*
* @param call the call
*/
private void addCall(ServiceCall call) {
this.serviceCalls.add(call);
}
}
}
Loading

0 comments on commit 0493b5e

Please sign in to comment.