Skip to content

Commit

Permalink
Merge pull request #670 from xingwu1/autorest
Browse files Browse the repository at this point in the history
Fix bugs in the createtasks function
  • Loading branch information
jianghaolu committed Apr 27, 2016
2 parents 913cb61 + 02dffbe commit 3e1439f
Show file tree
Hide file tree
Showing 15 changed files with 402 additions and 4,592 deletions.
1 change: 1 addition & 0 deletions azure-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
197 changes: 168 additions & 29 deletions azure-batch/src/main/java/com/microsoft/azure/batch/TaskOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
package com.microsoft.azure.batch;

import com.microsoft.azure.PagedList;
import com.microsoft.azure.batch.interceptor.BatchClientParallelOptions;
import com.microsoft.azure.batch.protocol.models.*;
import com.microsoft.rest.ServiceResponseWithHeaders;
import com.sun.javafx.tk.Toolkit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;

public class TaskOperations implements IInheritedBehaviors {
TaskOperations(BatchClient batchClient, Collection<BatchClientBehavior> customBehaviors) {
Expand Down Expand Up @@ -50,48 +50,187 @@ public void createTask(String jobId, TaskAddParameter taskToAdd, Iterable<BatchC
this._parentBatchClient.getProtocolLayer().getTaskOperations().add(jobId, taskToAdd, options);
}

public void createTasks(String jobId, List<TaskAddParameter> taskList) throws BatchErrorException, IOException {
public void createTasks(String jobId, List<TaskAddParameter> taskList) throws BatchErrorException, IOException, InterruptedException {
createTasks(jobId, taskList, null);
}

public void createTasks(String jobId, List<TaskAddParameter> taskList, Iterable<BatchClientBehavior> additionalBehaviors) throws BatchErrorException, IOException {
private static class WorkingThread implements Runnable {

final int MAX_TASKS_PER_REQUEST = 100;

TaskAddCollectionOptions options = new TaskAddCollectionOptions();
BehaviorManager bhMgr = new BehaviorManager(this.getCustomBehaviors(), additionalBehaviors);
bhMgr.applyRequestBehaviors(options);
private BatchClient client;
private BehaviorManager bhMgr;
private String jobId;
private Queue<TaskAddParameter> pendingList;
private List<TaskAddResult> failures;
private volatile Exception exception;
private final Object lock;

WorkingThread(BatchClient client, BehaviorManager bhMgr, String jobId, Queue<TaskAddParameter> pendingList, List<TaskAddResult> failures, Object lock) {
this.client = client;
this.bhMgr = bhMgr;
this.jobId = jobId;
this.pendingList = pendingList;
this.failures = failures;
this.exception = null;
this.lock = lock;
}

List<TaskAddParameter> pendingList = new ArrayList<>(taskList);
public Exception getException() {
return this.exception;
}

while (!pendingList.isEmpty()) {
List<TaskAddParameter> currentList = pendingList.subList(0, MAX_TASKS_PER_REQUEST - 1);
pendingList.removeAll(currentList);

ServiceResponseWithHeaders<TaskAddCollectionResult, TaskAddCollectionHeaders> response = this._parentBatchClient.getProtocolLayer().getTaskOperations().addCollection(jobId, currentList, options);
if (response.getBody() != null && response.getBody().getValue() != null) {
List<TaskAddResult> failures = new ArrayList<>();

for (TaskAddResult result : response.getBody().getValue()) {
if (result.getError() != null){
if (result.getStatus() == TaskAddStatus.SERVERERROR) {
for (TaskAddParameter addParameter : taskList) {
if (addParameter.getId() == result.getTaskId()) {
pendingList.add(addParameter);
break;
@Override
public void run() {

List<TaskAddParameter> taskList = new LinkedList<>();

// Take the task from the queue up to MAX_TASKS_PER_REQUEST
int count = 0;
while (count < MAX_TASKS_PER_REQUEST) {
TaskAddParameter param = pendingList.poll();
if (param != null) {
taskList.add(param);
count++;
}
else {
break;
}
}

if (taskList.size() > 0) {
// The option should be different to every server calls (for example, client-request-id)
TaskAddCollectionOptions options = new TaskAddCollectionOptions();
this.bhMgr.applyRequestBehaviors(options);

try {
ServiceResponseWithHeaders<TaskAddCollectionResult, TaskAddCollectionHeaders> response = this.client.getProtocolLayer().getTaskOperations().addCollection(this.jobId, taskList, options);

if (response.getBody() != null && response.getBody().getValue() != null) {
for (TaskAddResult result : response.getBody().getValue()) {
if (result.getError() != null) {
if (result.getStatus() == TaskAddStatus.SERVERERROR) {
// Server error will be retried
for (TaskAddParameter addParameter : taskList) {
if (addParameter.getId().equals(result.getTaskId())) {
pendingList.add(addParameter);
break;
}
}
} else if (result.getStatus() == TaskAddStatus.CLIENTERROR && !result.getError().getCode().equals(BatchErrorCodeStrings.TaskExists)) {
// Client error will be recorded
failures.add(result);
}
}
}
else if (result.getStatus() == TaskAddStatus.CLIENTERROR && result.getError().getCode() != BatchErrorCodeStrings.TaskExists) {
failures.add(result);
}
} catch (BatchErrorException | IOException e) {
// Any exception will stop further call
exception = e;
pendingList.addAll(taskList);
}
}

synchronized (lock) {
// Notify main thread that sub thread finished
lock.notify();
}
}
}

public void createTasks(String jobId, List<TaskAddParameter> taskList, Iterable<BatchClientBehavior> additionalBehaviors) throws BatchErrorException, IOException, InterruptedException {

BehaviorManager bhMgr = new BehaviorManager(this.getCustomBehaviors(), additionalBehaviors);

// Default thread number is 1
int threadNumber = 1;

// Get user defined thread number
for (BatchClientBehavior op : bhMgr.getMasterListOfBehaviors()) {
if (op instanceof BatchClientParallelOptions) {
threadNumber = ((BatchClientParallelOptions) op).getMaxDegreeOfParallelism();
break;
}
}

final Object lock = new Object();
ConcurrentLinkedQueue<TaskAddParameter> pendingList = new ConcurrentLinkedQueue<>(taskList);
CopyOnWriteArrayList<TaskAddResult> failures = new CopyOnWriteArrayList<>();

Map<Thread, WorkingThread> threads = new HashMap<>();
Exception innerException = null;

while (!pendingList.isEmpty()) {

if (threads.size() < threadNumber) {
// Kick as many as possible add tasks requests by max allowed threads
WorkingThread worker = new WorkingThread(this._parentBatchClient, bhMgr, jobId, pendingList, failures, lock);
Thread thread = new Thread(worker);
thread.start();
threads.put(thread, worker);
}
else {
// Wait any thread finished
synchronized (lock) {
lock.wait();
}

List<Thread> finishedThreads = new ArrayList<>();
for (Thread t : threads.keySet()) {
if (t.getState() == Thread.State.TERMINATED) {
finishedThreads.add(t);
// If any exception happened, do not care the left requests
innerException = threads.get(t).getException();
if (innerException != null) {
break;
}
}
}

if (!failures.isEmpty()) {
throw new CreateTasksTerminatedException("At least one task failed to be added.", failures, pendingList);
// Free thread pool, so we can start more threads to send the request
threads.keySet().removeAll(finishedThreads);

// Any errors happened, we stop
if (innerException != null || !failures.isEmpty()) {
break;
}
}
}

// May sure all the left threads finished
for (Thread t : threads.keySet()) {
t.join();
}

if (innerException == null) {
// Anything bad happened at the left threads?
for (Thread t : threads.keySet()) {
innerException = threads.get(t).getException();
if (innerException != null) {
break;
}
}
}

if (innerException != null) {
// We throw any exception happened in sub thread
if (innerException instanceof BatchErrorException) {
throw (BatchErrorException) innerException;
} else {
throw (IOException) innerException;
}
}

if (!failures.isEmpty()) {
// Report any client error with leftover request
List<TaskAddParameter> notFinished = new ArrayList<>();
for (TaskAddParameter param : pendingList) {
notFinished.add(param);
}
throw new CreateTasksTerminatedException("At least one task failed to be added.", failures, notFinished);
}

// We succeed here
}

public List<CloudTask> listTasks(String jobId) throws BatchErrorException, IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.microsoft.azure.batch.interceptor;

import com.microsoft.azure.batch.BatchClientBehavior;

public class BatchClientParallelOptions extends BatchClientBehavior {

private int maxDegreeOfParallelism;

/// <summary>
/// Gets or sets the maximum number of concurrent tasks enabled by this <see cref="BatchClientParallelOptions"/> instance.
/// The default value is 1.
/// </summary>
public int getMaxDegreeOfParallelism() {
return this.maxDegreeOfParallelism;
}

public void setMaxDegreeOfParallelism(int maxDegreeOfParallelism) {
if (maxDegreeOfParallelism > 0) {
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
}
else {
throw new IllegalArgumentException("maxDegreeOfParallelism");
}
}

public BatchClientParallelOptions() {
this.maxDegreeOfParallelism = 1;
}

public BatchClientParallelOptions(int maxDegreeOfParallelism) {
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.microsoft.azure.batch.interceptor;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class ServerTimeoutInterceptor extends RequestInterceptor {

private final int serverTimeout;

public ServerTimeoutInterceptor(int timeout) {
this.serverTimeout = timeout;
this.setHandler(new BatchRequestInterceptHandler() {
@Override
public void modify(Object request) {
Class<?> c = request.getClass();
try {
Method timeoutMethod = c.getMethod("setTimeout", new Class[]{Integer.class});
if (timeoutMethod != null) {
timeoutMethod.invoke(request, serverTimeout);
}
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
}
}
});
}
}
Loading

0 comments on commit 3e1439f

Please sign in to comment.