Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the Bulk Task Create Utility #36204

Merged
merged 4 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
package com.azure.compute.batch;

import com.azure.compute.batch.implementation.TasksImpl;
import com.azure.compute.batch.models.BatchTask;
import com.azure.compute.batch.models.BatchTaskCollection;
import com.azure.compute.batch.models.BatchTaskCreateParameters;
import com.azure.compute.batch.models.BatchTaskListSubtasksResult;
import com.azure.compute.batch.models.TaskAddCollectionResult;
import com.azure.compute.batch.models.*;
import com.azure.core.annotation.Generated;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
Expand All @@ -26,6 +22,11 @@
import com.azure.core.util.DateTimeRfc1123;
import java.time.OffsetDateTime;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

/** Initializes a new instance of the synchronous BatchServiceClient type. */
@ServiceClient(builder = BatchServiceClientBuilder.class)
public final class TaskClient {
Expand Down Expand Up @@ -2104,4 +2105,263 @@ public void reactivate(
}
reactivateWithResponse(jobId, taskId, requestOptions).getValue();
}

/**
* Adds multiple tasks to a job.
*
* @param jobId
* The ID of the job to which to add the task.
* @param taskList
* A list of {@link BatchTaskCreateParameters tasks} to add.
* @throws RuntimeException
* Exception thrown when an error response is received from the
* Batch service or any network exception.
* @throws InterruptedException
* Exception thrown if any thread has interrupted the current
* thread.
*/
public void createTasks(String jobId, List<BatchTaskCreateParameters> taskList)
throws RuntimeException, InterruptedException {
createTasks(jobId, taskList, null);
}

private static class WorkingThread implements Runnable {
static final int MAX_TASKS_PER_REQUEST = 100;
private static final AtomicInteger CURRENT_MAX_TASKS = new AtomicInteger(MAX_TASKS_PER_REQUEST);

TaskClient client;
String jobId;
Queue<BatchTaskCreateParameters> pendingList;
List<TaskAddResult> failures;
volatile Exception exception;
final Object lock;

WorkingThread(TaskClient client, String jobId, Queue<BatchTaskCreateParameters> pendingList,
List<TaskAddResult> failures, Object lock) {
this.client = client;
this.jobId = jobId;
this.pendingList = pendingList;
this.failures = failures;
this.exception = null;
this.lock = lock;
}

public Exception getException() {
return this.exception;
}

/**
* Submits one chunk of tasks to a job.
*
* @param taskList
* A list of {@link BatchTask tasks} to add.
*/
private void submitChunk(List<BatchTaskCreateParameters> taskList) {
try {
TaskAddCollectionResult response = this.client.addCollection(this.jobId,new BatchTaskCollection(taskList));

if (response != null && response.getValue() != null) {
for (TaskAddResult result : response.getValue()) {
if (result.getError() != null) {
if (result.getStatus() == TaskAddStatus.SERVER_ERROR) {
// Server error will be retried
for (BatchTaskCreateParameters batchTaskToCreate : taskList) {
if (batchTaskToCreate.getId().equals(result.getTaskId())) {
pendingList.add(batchTaskToCreate);
break;
}
}
} else if (result.getStatus() == TaskAddStatus.CLIENT_ERROR
&& !result.getError().getMessage().getValue().contains("Status code 409")) {
// Client error will be recorded
failures.add(result);
}
}
}
}
}
/*
* TODO: Track 1 SDK had an autogenerated BatchErrorException which encapsulated a {@link BatchError} - Investigate why custom exception type is not generated
* */
catch (HttpResponseException e) {
// If we get RequestBodyTooLarge could be that we chunked the tasks too large.
// Try decreasing the size unless caused by 1 task.
if (e.getResponse().getStatusCode() == 413 && taskList.size() > 1) {
// Use binary reduction to decrease size of submitted chunks
int midpoint = taskList.size() / 2;
// If the midpoint is less than the CURRENT_MAX_TASKS used to create new chunks,
// attempt to atomically reduce CURRENT_MAX_TASKS.
// In the case where compareAndSet fails, that means that CURRENT_MAX_TASKS which
// was the goal
int max = CURRENT_MAX_TASKS.get();
while (midpoint < max) {
CURRENT_MAX_TASKS.compareAndSet(max, midpoint);
max = CURRENT_MAX_TASKS.get();
}
// Resubmit chunk as a smaller list and requeue remaining tasks.
pendingList.addAll(taskList.subList(midpoint, taskList.size()));
submitChunk(taskList.subList(0, midpoint));
} else {
// Any exception will stop further call
exception = e;
pendingList.addAll(taskList);
}
}
catch (RuntimeException e) {
// Any exception will stop further call
exception = e;
pendingList.addAll(taskList);
}
}

@Override
public void run() {
try {
List<BatchTaskCreateParameters> taskList = new LinkedList<>();

// Take the task from the queue up to MAX_TASKS_PER_REQUEST
int count = 0;
int maxAmount = CURRENT_MAX_TASKS.get();
while (count < maxAmount) {
BatchTaskCreateParameters param = pendingList.poll();
if (param != null) {
taskList.add(param);
count++;
} else {
break;
}
}

if (taskList.size() > 0) {
submitChunk(taskList);
}
} finally {
synchronized (lock) {
// Notify main thread that sub thread finished
lock.notifyAll();
}
}
}
}

/**
* Creates a collection of Tasks to the specified Job.
*
* <p>Note that each Task must have a unique ID.This method can work with multiple threads. The parallel degree can
* be specified by the user. If the server times out or the connection is closed during the request, the request may
* have been partially or fully processed, or not at all. In such cases, the user should re-issue the request. Note
* that it is up to the user to correctly handle failures when re-issuing a request. For example, you should use the
* same Task IDs during a retry so that if the prior operation succeeded, the retry will not create extra Tasks
* unexpectedly. If the response contains any Tasks which failed to add, a client can retry the request. In a retry,
* it is most efficient to resubmit only Tasks that failed to add, and to omit Tasks that were successfully added on
* the first attempt. The maximum lifetime of a Task from addition to completion is 180 days. If a Task has not
* completed within 180 days of being added it will be terminated by the Batch service and left in whatever state it
* was in at that time.
*
* @param jobId
* The ID of the job to which to add the task.
* @param taskList
* A list of {@link BatchTaskCreateParameters tasks} to add.
* @param batchClientParallelOptions
* Option that configure the parallelization of the method.
* @throws RuntimeException
* Exception thrown when an error response is received from the
* Batch service or any network exception.
* @throws InterruptedException
* Exception thrown if any thread has interrupted the current
* thread.
*/
public void createTasks(String jobId, List<BatchTaskCreateParameters> taskList, BatchClientParallelOptions batchClientParallelOptions) throws RuntimeException, InterruptedException{
int threadNumber = 1;

// Get user defined thread number
if(batchClientParallelOptions != null){
threadNumber = batchClientParallelOptions.maxDegreeOfParallelism();
}

final Object lock = new Object();
ConcurrentLinkedQueue<BatchTaskCreateParameters> pendingList = new ConcurrentLinkedQueue<>(taskList);
CopyOnWriteArrayList<TaskAddResult> failures = new CopyOnWriteArrayList<>();

Map<Thread, WorkingThread> threads = new HashMap<>();
Exception innerException = null;

synchronized (lock){
while(!pendingList.isEmpty()){

if (threads.size() < threadNumber) {
// Kick as many as possible add tasks requests by max allowed threads
WorkingThread worker = new WorkingThread(this, jobId, pendingList,
failures, lock);
Thread thread = new Thread(worker);
thread.start();
threads.put(thread, worker);
}else{
lock.wait();

List<Thread> finishedThreads = new ArrayList<>();
for (Map.Entry<Thread, WorkingThread> entry : threads.entrySet()) {
if (entry.getKey().getState() == Thread.State.TERMINATED) {
finishedThreads.add(entry.getKey());
// If any exception is encountered, then stop immediately without waiting for
// remaining active threads.
innerException = entry.getValue().getException();
if (innerException != null) {
break;
}
}
}

// Free the thread pool so we can start more threads to send the remaining add
// tasks requests.
threads.keySet().removeAll(finishedThreads);

// Any errors happened, we stop.
if (innerException != null || !failures.isEmpty()) {
break;
}
}
}
}


// Wait for all remaining threads to finish.
for (Thread t : threads.keySet()) {
t.join();
}

if (innerException == null) {
// Check for errors in any of the threads.
for (Map.Entry<Thread, WorkingThread> entry : threads.entrySet()) {
innerException = entry.getValue().getException();
if (innerException != null) {
break;
}
}
}

if (innerException != null) {
// If an exception happened in any of the threads, throw it.
if (innerException instanceof HttpResponseException) {
throw (HttpResponseException) innerException;
} else if (innerException instanceof RuntimeException) {
// WorkingThread will only catch and store a BatchErrorException or a
// RuntimeException in its run() method.
// WorkingThread.getException() should therefore only return one of these two
// types, making the cast safe.
throw (RuntimeException) innerException;
}
}

if (!failures.isEmpty()) {
// Report any client error with leftover request
List<BatchTaskCreateParameters> notFinished = new ArrayList<>();
for (BatchTaskCreateParameters param : pendingList) {
notFinished.add(param);
}
throw new CreateTasksErrorException("At least one task failed to be added.", failures, notFinished);
}

// We succeed here
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.compute.batch.models;

/**
* Stores options that configure the operation of methods on Batch client parallel operations.
*/
public class BatchClientParallelOptions {

private int maxDegreeOfParallelism;

/**
* Gets the maximum number of concurrent tasks enabled by this {@link BatchClientParallelOptions} instance.
*
* The default value is 1.
* @return The maximum number of concurrent tasks.
*/
public int maxDegreeOfParallelism() {
return this.maxDegreeOfParallelism;
}

/**
* Sets the maximum number of concurrent tasks enabled by this {@link BatchClientParallelOptions} instance.
*
* @param maxDegreeOfParallelism the maximum number of concurrent tasks.
* @return The instance of {@link BatchClientParallelOptions}.
*/
public BatchClientParallelOptions setMaxDegreeOfParallelism(int maxDegreeOfParallelism) {
if (maxDegreeOfParallelism > 0) {
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
} else {
throw new IllegalArgumentException("maxDegreeOfParallelism");
}
return this;
}

/**
* Initializes a new instance of the {@link BatchClientParallelOptions} class with default values.
*/
public BatchClientParallelOptions() {
this.maxDegreeOfParallelism = 1;
}

/**
* Initializes a new instance of the {@link BatchClientParallelOptions} class.
*
* @param maxDegreeOfParallelism the maximum number of concurrent tasks.
*/
public BatchClientParallelOptions(int maxDegreeOfParallelism) {
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.compute.batch.models;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.HttpResponse;

public class BatchErrorException extends HttpResponseException {
/**
* Initializes a new instance of the BatchErrorException class.
*
* @param message the exception message or the response content if a message is not available.
* @param response the HTTP response.
*/
public BatchErrorException(String message, HttpResponse response) {
super(message, response);
}

/**
* Initializes a new instance of the BatchErrorException class.
*
* @param message the exception message or the response content if a message is not available.
* @param response the HTTP response.
* @param value the deserialized response value.
*/
public BatchErrorException(String message, HttpResponse response, BatchError value) {
super(message, response, value);
}

@Override
public BatchError getValue() {
return (BatchError) super.getValue();
}
}
Loading