Skip to content

Commit

Permalink
fix: batch resume session to support resuming session with several 10…
Browse files Browse the repository at this point in the history
…0k tasks (#800)

# Motivation

When a session with more than 400k tasks is resumed, the resume session
RPC crashes due to a request too large in the database and fails to
change the status of tasks from Paused to Submitted and requeue it.

# Description

Batching was introduced to update statuses of the tasks that should be
resumed.

# Testing

Unit tests are passing and large sessions can be resumed.

# Impact

- The RPC will succeed.
- As batching was introduced, more requests on the database will be
performed and the RPC will take more time.

# Checklist

- [x] My code adheres to the coding and style guidelines of the project.
- [x] I have performed a self-review of my code.
- [ ] I have commented my code, particularly in hard-to-understand
areas.
- [ ] I have made corresponding changes to the documentation.
- [x] I have thoroughly tested my modifications and added tests when
necessary.
- [x] Tests pass locally and in the CI.
- [x] I have assessed the performance impact of my modifications.
  • Loading branch information
aneojgurhem authored Nov 21, 2024
2 parents 4156fca + cd54c6c commit 4b2ccc7
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions Common/src/Storage/TaskLifeCycleHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -530,20 +530,25 @@ public static async Task<SessionData> ResumeAsync(ITaskTable taskTable,
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
var tasks = await grouping.ToListAsync(cancellationToken)
.ConfigureAwait(false);
await foreach (var tasks in grouping.ToChunksAsync(100000,
TimeSpan.FromSeconds(1),
cancellationToken)
.ConfigureAwait(false))
{
var taskIds = tasks.Select(task => task.TaskId)
.AsICollection();

var taskIds = tasks.Select(task => task.TaskId)
.AsICollection();
await taskTable.UpdateManyTasks(data => data.SessionId == sessionId && data.Status == TaskStatus.Paused && taskIds.Contains(data.TaskId),
new UpdateDefinition<TaskData>().Set(data => data.Status,
TaskStatus.Submitted),
cancellationToken)
.ConfigureAwait(false);
await pushQueueStorage.PushMessagesAsync(tasks,
grouping.Key.PartitionId,
cancellationToken)
.ConfigureAwait(false);
await taskTable.UpdateManyTasks(data => data.SessionId == sessionId && data.Status == TaskStatus.Paused && taskIds.Contains(data.TaskId),
new UpdateDefinition<TaskData>().Set(data => data.Status,
TaskStatus.Submitted),
cancellationToken)
.ConfigureAwait(false);

await pushQueueStorage.PushMessagesAsync(tasks,
grouping.Key.PartitionId,
cancellationToken)
.ConfigureAwait(false);
}
}

return session;
Expand Down

0 comments on commit 4b2ccc7

Please sign in to comment.