diff --git a/.travis.yml b/.travis.yml index bc2a8e6b83..00de321762 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,3 @@ script: - mvn clean install - mvn checkstyle:check - cd ./azure-android-client-authentication && ./gradlew check -env: - global: - secure: RnU4qnuCJRzSvAAXApk9yVhIH+gtl5RNmoVva/hzuSF0WcSaRh2CBe37KUNnbBDJaHd53L5AvHBpbcmaylbOYaRQ/vxUG1gAEHuyrX3ANvGLKYKjtg1F8i853h1Y/y/TZ9MNfzLOqlo/DtD/jAl6pOAMyxNxSEwwFjHY+zbzTOA4kXjTDPGtNwB253v46j5vzSUcEKHpAXER2RjTeurVFeMFDT78Ou+4DCFBqpenObwr1CH+YPTwIoRBvzzPFDKy8+3rdWXOCB2QloaeFntxH1NDbkhReBe4KL3Ue03ksxrAqcmoJR6qFCQebvHYQYSXTk7kmoOxRFTowGqJ0SErUQwT85MA+3JYDmWDKvI3Zq1lQvhmv7dhsbgJASTxpwW+cqStCzKhAYADB6nB8Nv5AUE/5wQDWXEANUQ2t/eWooip0IcIHeJfBj0qDk+WvjAKGSGu0tIAD71Z1h36WHxf0U/fTi673tguwrhWpnUCbQaQCiDqMm7sSWOlyIdAoxy3JwLGaJq3bRGAjgwbfY32VQ/GN00ttSYu63SFDkxYATC4/FtYheNB0Yi4bDU//me9WcB/mt4bi98nC1CBK1tVS1Rn6ATkXu5K9oMvfCX7UlKe6rGmW7tVdXqtwMEFBpH3Z94tXxQEkZ+AITociIBV85+Oy/BOYvMCwysNv/PEqx0= diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java index 112130413f..4e1848fd8c 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java @@ -10,6 +10,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; /** * The type representing node in a {@link DAGraph}. @@ -20,6 +21,7 @@ public class DAGNode extends Node { private List dependentKeys; private int toBeResolved; private boolean isPreparer; + private ReentrantLock lock; /** * Creates a DAG node. @@ -30,6 +32,14 @@ public class DAGNode extends Node { public DAGNode(String key, T data) { super(key, data); dependentKeys = new ArrayList<>(); + lock = new ReentrantLock(); + } + + /** + * @return the lock to be used while performing thread safe operation on this node. + */ + public ReentrantLock lock() { + return this.lock; } /** diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java index 58179e0152..153c38f259 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java @@ -7,9 +7,8 @@ package com.microsoft.azure; -import java.util.ArrayDeque; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; /** * Type representing a DAG (directed acyclic graph). @@ -20,7 +19,7 @@ * @param the type of the nodes in the graph */ public class DAGraph> extends Graph { - private Queue queue; + private ConcurrentLinkedQueue queue; private boolean hasParent; private U rootNode; @@ -31,7 +30,7 @@ public class DAGraph> extends Graph { */ public DAGraph(U rootNode) { this.rootNode = rootNode; - this.queue = new ArrayDeque<>(); + this.queue = new ConcurrentLinkedQueue<>(); this.rootNode.setPreparer(true); this.addNode(rootNode); } @@ -103,10 +102,14 @@ public void prepare() { * Gets next node in the DAG which has no dependency or all of it's dependencies are resolved and * ready to be consumed. * - * @return next node or null if all the nodes have been explored + * @return next node or null if all the nodes have been explored or no node is available at this moment. */ public U getNext() { - return graph.get(queue.poll()); + String nextItemKey = queue.poll(); + if (nextItemKey == null) { + return null; + } + return graph.get(nextItemKey); } /** @@ -129,9 +132,14 @@ public void reportedCompleted(U completed) { String dependency = completed.key(); for (String dependentKey : graph.get(dependency).dependentKeys()) { DAGNode dependent = graph.get(dependentKey); - dependent.reportResolved(dependency); - if (dependent.hasAllResolved()) { - queue.add(dependent.key()); + dependent.lock().lock(); + try { + dependent.reportResolved(dependency); + if (dependent.hasAllResolved()) { + queue.add(dependent.key()); + } + } finally { + dependent.lock().unlock(); } } } @@ -145,9 +153,8 @@ public void reportedCompleted(U completed) { */ private void initializeDependentKeys() { visit(new Visitor() { - // This 'visit' will be called only once per each node. @Override - public void visit(U node) { + public void visitNode(U node) { if (node.dependencyKeys().isEmpty()) { return; } @@ -158,6 +165,13 @@ public void visit(U node) { .addDependent(dependentKey); } } + + @Override + public void visitEdge(String fromKey, String toKey, EdgeType edgeType) { + if (edgeType == EdgeType.BACK) { + throw new IllegalStateException("Detected circular dependency: " + findPath(fromKey, toKey)); + } + } }); } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java b/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java index 40ceebaa50..89087dd6d3 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java @@ -23,6 +23,11 @@ public class Graph> { protected Map graph; private Set visited; + private Integer time; + private Map entryTime; + private Map exitTime; + private Map parent; + private Set processed; /** * Creates a directed graph. @@ -30,6 +35,11 @@ public class Graph> { public Graph() { this.graph = new HashMap<>(); this.visited = new HashSet<>(); + this.time = 0; + this.entryTime = new HashMap<>(); + this.exitTime = new HashMap<>(); + this.parent = new HashMap<>(); + this.processed = new HashSet<>(); } /** @@ -41,26 +51,11 @@ public void addNode(U node) { graph.put(node.key(), node); } - /** - * Represents a visitor to be implemented by the consumer who want to visit the - * graph's nodes in DFS order. - * - * @param the type of the node - */ - interface Visitor { - /** - * visit a node. - * - * @param node the node to visited - */ - void visit(U node); - } - /** * Perform DFS visit in this graph. *

* The directed graph will be traversed in DFS order and the visitor will be notified as - * search explores each node + * search explores each node and edge. * * @param visitor the graph visitor */ @@ -71,15 +66,107 @@ public void visit(Visitor visitor) { } } visited.clear(); + time = 0; + entryTime.clear(); + exitTime.clear(); + parent.clear(); + processed.clear(); } private void dfs(Visitor visitor, Node node) { - visitor.visit(node); - visited.add(node.key()); - for (String childKey : node.children()) { - if (!visited.contains(childKey)) { - this.dfs(visitor, this.graph.get(childKey)); + visitor.visitNode(node); + + String fromKey = node.key(); + visited.add(fromKey); + time++; + entryTime.put(fromKey, time); + for (String toKey : node.children()) { + if (!visited.contains(toKey)) { + parent.put(toKey, fromKey); + visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey)); + this.dfs(visitor, this.graph.get(toKey)); + } else { + visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey)); } } + time++; + exitTime.put(fromKey, time); + processed.add(fromKey); + } + + private EdgeType edgeType(String fromKey, String toKey) { + if (parent.containsKey(toKey) && parent.get(toKey).equals(fromKey)) { + return EdgeType.TREE; + } + + if (visited.contains(toKey) && !processed.contains(toKey)) { + return EdgeType.BACK; + } + + if (processed.contains(toKey) && entryTime.containsKey(toKey) && entryTime.containsKey(fromKey)) { + if (entryTime.get(toKey) > entryTime.get(fromKey)) { + return EdgeType.FORWARD; + } + + if (entryTime.get(toKey) < entryTime.get(fromKey)) { + return EdgeType.CROSS; + } + } + + throw new IllegalStateException("Internal Error: Unable to locate the edge type {" + fromKey + ", " + toKey + "}"); + } + + protected String findPath(String start, String end) { + if (start.equals(end)) { + return start; + } else { + return findPath(start, parent.get(end)) + " -> " + end; + } + } + + /** + * The edge types in a graph. + */ + enum EdgeType { + /** + * An edge (u, v) is a tree edge if v is visited the first time. + */ + TREE, + /** + * An edge (u, v) is a forward edge if v is descendant of u. + */ + FORWARD, + /** + * An edge (u, v) is a back edge if v is ancestor of u. + */ + BACK, + /** + * An edge (u, v) is a cross edge if v is neither ancestor or descendant of u. + */ + CROSS + } + + /** + * Represents a visitor to be implemented by the consumer who want to visit the + * graph's nodes in DFS order by calling visit method. + * + * @param the type of the node + */ + interface Visitor { + /** + * visit a node. + * + * @param node the node to visited + */ + void visitNode(U node); + + /** + * visit an edge. + * + * @param fromKey key of the from node + * @param toKey key of the to node + * @param edgeType the edge type + */ + void visitEdge(String fromKey, String toKey, EdgeType edgeType); } } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java index 26bbbece26..3ad514d938 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java @@ -63,7 +63,7 @@ public interface TaskGroup> { * @param callback the callback to call on failure or success * @return the handle to the REST call */ - ServiceCall executeAsync(ServiceCallback callback); + ServiceCall executeAsync(ServiceCallback callback); /** * Gets the result of execution of a task in the group. diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java index efc8d30e49..b607a41f54 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java @@ -9,6 +9,9 @@ import com.microsoft.rest.ServiceCall; import com.microsoft.rest.ServiceCallback; +import com.microsoft.rest.ServiceResponse; + +import java.util.concurrent.ConcurrentLinkedQueue; /** * The base implementation of TaskGroup interface. @@ -18,6 +21,7 @@ public abstract class TaskGroupBase implements TaskGroup> { private DAGraph, DAGNode>> dag; + private ParallelServiceCall parallelServiceCall; /** * Creates TaskGroupBase. @@ -27,6 +31,7 @@ public abstract class TaskGroupBase */ public TaskGroupBase(String rootTaskItemId, TaskItem rootTaskItem) { this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem)); + this.parallelServiceCall = new ParallelServiceCall(); } @Override @@ -54,29 +59,17 @@ public void prepare() { @Override public void execute() throws Exception { DAGNode> nextNode = dag.getNext(); - if (nextNode == null) { - return; - } - - if (dag.isRootNode(nextNode)) { - executeRootTask(nextNode.data()); - } else { - nextNode.data().execute(this, nextNode); + while (nextNode != null) { + nextNode.data().execute(); + this.dag().reportedCompleted(nextNode); + nextNode = dag.getNext(); } } @Override - public ServiceCall executeAsync(final ServiceCallback callback) { - final DAGNode> nextNode = dag.getNext(); - if (nextNode == null) { - return null; - } - - if (dag.isRootNode(nextNode)) { - return executeRootTaskAsync(nextNode.data(), callback); - } else { - return nextNode.data().executeAsync(this, nextNode, callback); - } + public ServiceCall executeAsync(final ServiceCallback callback) { + executeReadyTasksAsync(callback); + return parallelServiceCall; } @Override @@ -85,27 +78,90 @@ public T taskResult(String taskId) { } /** - * Executes the root task in this group. - *

- * This method will be invoked when all the task dependencies of the root task are finished - * executing, at this point root task can be executed by consuming the result of tasks it - * depends on. + * Executes all runnable tasks, a task is runnable when all the tasks its depends + * on are finished running. * - * @param task the root task in this group - * @throws Exception the exception + * @param callback the callback */ - public abstract void executeRootTask(TaskItem task) throws Exception; + private void executeReadyTasksAsync(final ServiceCallback callback) { + DAGNode> nextNode = dag.getNext(); + while (nextNode != null) { + ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback)); + this.parallelServiceCall.addCall(serviceCall); + nextNode = dag.getNext(); + } + } /** - * Executes the root task in this group asynchronously. - *

- * This method will be invoked when all the task dependencies of the root task are finished - * executing, at this point root task can be executed by consuming the result of tasks it - * depends on. + * This method create and return a callback for the runnable task stored in the given node. + * This callback wraps the given callback. * - * @param task the root task in this group - * @param callback the callback when the task fails or succeeds - * @return the handle to the REST call + * @param taskNode the node containing runnable task + * @param callback the callback to wrap + * @return the task callback */ - public abstract ServiceCall executeRootTaskAsync(TaskItem task, ServiceCallback callback); + private ServiceCallback taskCallback(final DAGNode> taskNode, final ServiceCallback callback) { + final TaskGroupBase self = this; + return new ServiceCallback() { + @Override + public void failure(Throwable t) { + callback.failure(t); + } + + @Override + public void success(ServiceResponse result) { + self.dag().reportedCompleted(taskNode); + if (self.dag().isRootNode(taskNode)) { + callback.success(result); + } else { + self.executeReadyTasksAsync(callback); + } + } + }; + } + + /** + * Type represents a set of REST calls running possibly in parallel. + */ + private class ParallelServiceCall extends ServiceCall { + private ConcurrentLinkedQueue serviceCalls; + + /** + * Creates a ParallelServiceCall. + */ + ParallelServiceCall() { + super(null); + this.serviceCalls = new ConcurrentLinkedQueue<>(); + } + + /** + * Cancels all the service calls currently executing. + */ + public void cancel() { + for (ServiceCall call : this.serviceCalls) { + call.cancel(); + } + } + + /** + * @return true if the call has been canceled; false otherwise. + */ + public boolean isCancelled() { + for (ServiceCall call : this.serviceCalls) { + if (!call.isCanceled()) { + return false; + } + } + return true; + } + + /** + * Add a call to the list of parallel calls. + * + * @param call the call + */ + private void addCall(ServiceCall call) { + this.serviceCalls.add(call); + } + } } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java index fb74c23845..8f0a3459a2 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java @@ -26,21 +26,17 @@ public interface TaskItem { *

* once executed the result will be available through result getter * - * @param taskGroup the task group dispatching tasks - * @param node the node the task item is associated with * @throws Exception exception */ - void execute(TaskGroup> taskGroup, DAGNode> node) throws Exception; + void execute() throws Exception; /** * Executes the task asynchronously. *

* once executed the result will be available through result getter - - * @param taskGroup the task group dispatching tasks - * @param node the node the task item is associated with + * * @param callback callback to call on success or failure * @return the handle of the REST call */ - ServiceCall executeAsync(TaskGroup> taskGroup, DAGNode> node, ServiceCallback callback); + ServiceCall executeAsync(ServiceCallback callback); }