Skip to content

Commit

Permalink
Adding more scheduling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-buttner committed Mar 21, 2024
1 parent ae6d5b4 commit d6863f5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,6 @@ public void enqueue(RequestTask task) {
}
}

private TimeValue getScheduleTime() {
// TODO return result from rate limiter
return TimeValue.ZERO;
}

private void onFinishedExecutingTask() {
threadRunning.release();
try {
if (isShutdown()) {
notifyRequestsOfShutdown();
} else {
checkForTask();
}
} finally {
phaser.arriveAndDeregister();
}
}

private void checkForTask() {
// There is a possibility that a request could come in, acquire the semaphore, and complete the task between when
// we peek and when we attempt to acquire the semaphore here. We'll handle that by peeking while attempt to dequeue
Expand All @@ -212,26 +194,61 @@ private void checkForTask() {
}
}

private TimeValue getScheduleTime() {
// TODO return result from rate limiter
return TimeValue.ZERO;
}

private void scheduleRequest(Runnable executableRequest) {
Runnable toRun = () -> {
executableRequest.run();
onFinishedExecutingTask();
};

// TODO get time from rate limiter
var timeDelay = TimeValue.ZERO;
// TODO
if (timeDelay == 0) {
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(toRun);
}
threadPool.schedule(toRun, timeDelay, threadPool.executor(UTILITY_THREAD_POOL_NAME));
}

private void onFinishedExecutingTask() {
threadRunning.release();
try {
checkForTask();
} finally {
phaser.arriveAndDeregister();
}
}

private void handleSingleRequest() {
try {
var task = queue.poll();

// If we have a buggy race condition it might be possible for another thread to get the task
// if that happens we'll just finish without doing anything
// Another scenario where the task would be null would be if we're instructed to shut down and some other
// thread drained the queue already
if (task != null) {
// TODO call request manager.execute and give it the rate limiter
// that call could schedule a new thread so we might still be "running" once it returns
// This could schedule a thread execution in the future, if the request needs to be rate limited
executeTask(task);
} else {
// We don't have a task to run, so release the thread lock
threadRunning.release();
}
} finally {
if (isShutdown()) {
notifyRequestsOfShutdown();
}
phaser.arriveAndDeregister();
}
}

private void executeTask(RejectableTask task) {
try {
phaser.register();
requestManager.execute(task);
requestManager.execute(task, this::scheduleRequest);
} catch (Exception e) {
logger.warn(
format(
Expand Down Expand Up @@ -283,5 +300,6 @@ private void rejectTask(RejectableTask task) {
);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.xpack.inference.external.http.retry.RetryingHttpSender;

import java.util.Objects;
import java.util.function.Consumer;

/**
* Handles executing a single inference request at a time.
Expand All @@ -22,7 +23,7 @@ public SingleRequestManager(RetryingHttpSender requestSender) {
this.requestSender = Objects.requireNonNull(requestSender);
}

public void execute(InferenceRequest inferenceRequest) {
public void execute(InferenceRequest inferenceRequest, Consumer<Runnable> requestScheduler) {
if (isNoopRequest(inferenceRequest) || inferenceRequest.hasCompleted()) {
return;
}
Expand Down

0 comments on commit d6863f5

Please sign in to comment.