Skip to content

Commit

Permalink
refactor wait for cancellation token code (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHX authored Jan 5, 2025
1 parent ba8f58e commit 764b34c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 42 deletions.
55 changes: 13 additions & 42 deletions src/Runner.Server/Controllers/MessageController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2281,11 +2281,7 @@ private HookResponse ConvertYaml2(string fileRelativePath, string content, strin
var clone = Clone();
Task.Run(async () => {
try {
try {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(finished.Token, _job.CancelRequest.Token, jobitem.Cancel.Token).Token);
} catch {

}
await Helper.WaitAnyCancellationToken(finished.Token, _job.CancelRequest.Token, jobitem.Cancel.Token);
if(!finished.IsCancellationRequested) {
jobitem.Cancel.Cancel();
foreach(var ji in jobitem.Childs) {
Expand Down Expand Up @@ -2803,14 +2799,10 @@ private HookResponse ConvertYaml2(string fileRelativePath, string content, strin
Task.Run(async () => {
try {
for(int i = 0; i < 2; i++) {
try {
if(i == 0) {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(workflowContext.CancellationToken, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None).Token);
await Helper.WaitAnyCancellationToken(workflowContext.CancellationToken, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None);
} else {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None).Token);
}
} catch {

await Helper.WaitAnyCancellationToken(finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None);
}
if(finished.IsCancellationRequested) {
return;
Expand Down Expand Up @@ -2927,10 +2919,7 @@ private HookResponse ConvertYaml2(string fileRelativePath, string content, strin
} else {
workflowTraceWriter.Info("{0}", $"Starting Workflow run by concurrency group: {group}");
then();
try {
await Task.Delay(-1, finished.Token);
} catch {
}
await Helper.WaitAnyCancellationToken(finished.Token);
}
cgroup.FinishRunning(centry);
};
Expand All @@ -2948,11 +2937,7 @@ private HookResponse ConvertYaml2(string fileRelativePath, string content, strin
// Needed to avoid a deadlock between caller and reusable workflow
var prerunCancel = new CancellationTokenSource();
Task.Run(async () => {
try {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(workflowContext.CancellationToken, prerunCancel.Token, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None).Token);
} catch {

}
await Helper.WaitAnyCancellationToken(workflowContext.CancellationToken, prerunCancel.Token, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None);
if(!prerunCancel.Token.IsCancellationRequested && !finished.Token.IsCancellationRequested) {
workflowTraceWriter.Info("{0}", $"Prerun cancellation");
cancelPendingWorkflow();
Expand Down Expand Up @@ -3989,11 +3974,7 @@ private HookResponse AzureDevopsMain(string fileRelativePath, string content, st
var clone = Clone();
Task.Run(async () => {
try {
try {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(finished.Token, _job.CancelRequest.Token, jobitem.Cancel.Token).Token);
} catch {

}
await Helper.WaitAnyCancellationToken(finished.Token, _job.CancelRequest.Token, jobitem.Cancel.Token);
if(!finished.IsCancellationRequested) {
jobitem.Cancel.Cancel();
foreach(var ji in jobitem.Childs) {
Expand Down Expand Up @@ -4683,14 +4664,11 @@ private HookResponse AzureDevopsMain(string fileRelativePath, string content, st
Task.Run(async () => {
try {
for(int i = 0; i < 2; i++) {
try {

if(i == 0) {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(workflowContext.CancellationToken, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None).Token);
await Helper.WaitAnyCancellationToken(workflowContext.CancellationToken, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None);
} else {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None).Token);
}
} catch {

await Helper.WaitAnyCancellationToken(finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None);
}
if(finished.IsCancellationRequested) {
return;
Expand Down Expand Up @@ -4796,10 +4774,7 @@ private HookResponse AzureDevopsMain(string fileRelativePath, string content, st
} else {
workflowTraceWriter.Info("{0}", $"Starting Workflow run by concurrency group: {group}");
then();
try {
await Task.Delay(-1, finished.Token);
} catch {
}
await Helper.WaitAnyCancellationToken(finished.Token);
}
cgroup.FinishRunning(centry);
};
Expand All @@ -4817,11 +4792,7 @@ private HookResponse AzureDevopsMain(string fileRelativePath, string content, st
// Needed to avoid a deadlock between caller and reusable workflow
var prerunCancel = new CancellationTokenSource();
Task.Run(async () => {
try {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(workflowContext.CancellationToken, prerunCancel.Token, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None).Token);
} catch {

}
await Helper.WaitAnyCancellationToken(workflowContext.CancellationToken, prerunCancel.Token, finished.Token, workflowContext.ForceCancellationToken ?? CancellationToken.None);
if(!prerunCancel.Token.IsCancellationRequested && !finished.Token.IsCancellationRequested) {
workflowTraceWriter.Info("{0}", $"Prerun cancellation");
cancelPendingWorkflow();
Expand Down Expand Up @@ -6535,7 +6506,7 @@ public async Task<IActionResult> GetMessage(int poolId, Guid sessionId)
// Attempt to mitigate an actions/runner bug, where the runner doesn't send a jobcompleted event if we cancel to early
await Task.Delay(session.DoNotCancelBefore.Value - now, CancellationTokenSource.CreateLinkedTokenSource(jobRunningToken, ts.Token).Token);
}
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(jobRunningToken, ts.Token, job.CancelRequest.Token).Token);
await Helper.WaitAnyCancellationToken(jobRunningToken, ts.Token, job.CancelRequest.Token);
} catch (TaskCanceledException) {
// Connection Reset
if(ts.Token.IsCancellationRequested)
Expand Down Expand Up @@ -6584,7 +6555,7 @@ public async Task<IActionResult> GetMessage(int poolId, Guid sessionId)
}
} else {
try {
await Task.Delay(-1, CancellationTokenSource.CreateLinkedTokenSource(jobRunningToken, ts.Token).Token);
await Helper.WaitAnyCancellationToken(jobRunningToken, ts.Token);
} catch (TaskCanceledException) {
// Connection Reset
if(ts.Token.IsCancellationRequested)
Expand Down
37 changes: 37 additions & 0 deletions src/Runner.Server/Helper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace Runner.Server
{
public static class Helper
{
public static async Task WaitAnyCancellationToken(params CancellationToken[] tokens) {
using var allTokens = CancellationTokenSource.CreateLinkedTokenSource(tokens);
using var waitTask = Task.Delay(-1, allTokens.Token);
await Task.WhenAny(waitTask);
}

public static Task RunTaskWithProvider(IServiceProvider serviceProvider, Func<IServiceProvider, Task> func) {
var scope = serviceProvider.CreateScope();
return Task.Run(async () => {
try {
await func(scope.ServiceProvider);
} finally {
scope.Dispose();
}
});
}
public static Task RunTaskWithProvider(IServiceProvider serviceProvider, Action<IServiceProvider> func) {
var scope = serviceProvider.CreateScope();
return Task.Run(() => {
try {
func(scope.ServiceProvider);
} finally {
scope.Dispose();
}
});
}
}
}

0 comments on commit 764b34c

Please sign in to comment.