Skip to content

Commit

Permalink
Fix StartProcessing if cancellation is requested before
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Aug 24, 2024
1 parent 0f95f7d commit 02e3e1d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
31 changes: 26 additions & 5 deletions Common/src/Pollster/TaskHandler.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// This file is part of the ArmoniK project
//
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

Expand Down Expand Up @@ -190,7 +190,7 @@ await messageHandler_.DisposeIgnoreErrorAsync(logger_)
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
public async Task StopCancelledTask()
public async Task<bool> StopCancelledTask()
{
using var measure = functionExecutionMetrics_.CountAndTime();
using var activity = activitySource_.StartActivityFromParent(activityContext_,
Expand All @@ -214,8 +214,12 @@ await lateCts_.CancelAsync()
messageHandler_.Status = QueueMessageStatus.Cancelled;
await ReleaseTaskHandler()
.ConfigureAwait(false);

return true;
}
}

return false;
}

/// <summary>
Expand Down Expand Up @@ -766,6 +770,23 @@ await taskTable_.StartTask(taskData_,
"Task already in a final state, removing it from the queue");
throw;
}
// If an ArmoniKException is thrown, check if this was because
// the task has been canceled. Use standard error management otherwise.
catch (ArmoniKException e)
{
if (await StopCancelledTask()
.ConfigureAwait(false))
{
earlyCts_.Token.ThrowIfCancellationRequested();
}
else
{
await HandleErrorRequeueAsync(e,
taskData_,
earlyCts_.Token)
.ConfigureAwait(false);
}
}
catch (Exception e)
{
await HandleErrorRequeueAsync(e,
Expand Down
14 changes: 11 additions & 3 deletions Common/tests/Pollster/TaskHandlerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,7 @@ await testServiceProvider.TaskTable.GetTaskStatus(taskId)
}

[Test]
public async Task CancelLongTaskShouldSucceed()
public async Task CancelLongTaskShouldSucceed([Values] bool before)
{
var sqmh = new SimpleQueueMessageHandler
{
Expand Down Expand Up @@ -1573,7 +1573,9 @@ public async Task CancelLongTaskShouldSucceed()
await testServiceProvider.TaskHandler.PreProcessing()
.ConfigureAwait(false);

var exec = testServiceProvider.TaskHandler.ExecuteTask();
var exec = before
? testServiceProvider.TaskHandler.ExecuteTask()
: Task.CompletedTask;

// Cancel task for test

Expand All @@ -1587,6 +1589,11 @@ await testServiceProvider.TaskTable.CancelTaskAsync(new List<string>
CancellationToken.None)
.ConfigureAwait(false);

if (!before)
{
exec = testServiceProvider.TaskHandler.ExecuteTask();
}

await Task.Delay(TimeSpan.FromMilliseconds(200))
.ConfigureAwait(false);

Expand All @@ -1600,7 +1607,8 @@ await testServiceProvider.TaskHandler.StopCancelledTask()
await testServiceProvider.TaskHandler.StopCancelledTask()
.ConfigureAwait(false);

Assert.ThrowsAsync<TaskCanceledException>(() => exec);
Assert.That(() => exec,
Throws.InstanceOf<OperationCanceledException>());

Assert.AreEqual(TaskStatus.Cancelling,
await testServiceProvider.TaskTable.GetTaskStatus(taskId)
Expand Down

0 comments on commit 02e3e1d

Please sign in to comment.