From 75c71cafe33a1b257fde6aa42fcb0c03b36cf69e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 4 Sep 2023 09:21:48 +0200 Subject: [PATCH 01/24] feat: Task execution pipelining --- Common/src/Pollster/Pollster.cs | 66 +++++++++++----- .../src/Pollster/PostProcessingTaskQueue.cs | 62 +++++++++++++++ Common/src/Pollster/PostProcessor.cs | 60 ++++++++++++++ Common/src/Pollster/RunningTaskProcessor.cs | 76 ++++++++++++++++++ Common/src/Pollster/RunningTaskQueue.cs | 78 +++++++++++++++++++ Common/src/Pollster/TaskHandler.cs | 47 +++++++---- .../Helpers/SimplePullQueueStorageChannel.cs | 73 +++++++++++++++++ Common/tests/Helpers/TestPollsterProvider.cs | 9 +++ Common/tests/Pollster/PollsterTest.cs | 75 ++++++++---------- Common/tests/Pollster/TaskHandlerTest.cs | 30 ++++--- Compute/PollingAgent/src/Program.cs | 4 + 11 files changed, 487 insertions(+), 93 deletions(-) create mode 100644 Common/src/Pollster/PostProcessingTaskQueue.cs create mode 100644 Common/src/Pollster/PostProcessor.cs create mode 100644 Common/src/Pollster/RunningTaskProcessor.cs create mode 100644 Common/src/Pollster/RunningTaskQueue.cs create mode 100644 Common/tests/Helpers/SimplePullQueueStorageChannel.cs diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index 9a600fff6..729c68759 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -19,6 +19,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Net; +using System.Runtime.ExceptionServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -56,6 +57,7 @@ public class Pollster : IInitializable private readonly Injection.Options.Pollster pollsterOptions_; private readonly IPullQueueStorage pullQueueStorage_; private readonly IResultTable resultTable_; + private readonly RunningTaskQueue runningTaskQueue_; private readonly ISessionTable sessionTable_; private readonly ISubmitter submitter_; private readonly ITaskProcessingChecker taskProcessingChecker_; @@ -80,7 +82,8 @@ public Pollster(IPullQueueStorage pullQueueStorage, ITaskTable taskTable, ITaskProcessingChecker taskProcessingChecker, IWorkerStreamHandler workerStreamHandler, - IAgentHandler agentHandler) + IAgentHandler agentHandler, + RunningTaskQueue runningTaskQueue) { if (options.MessageBatchSize < 1) { @@ -103,6 +106,7 @@ public Pollster(IPullQueueStorage pullQueueStorage, taskProcessingChecker_ = taskProcessingChecker; workerStreamHandler_ = workerStreamHandler; agentHandler_ = agentHandler; + runningTaskQueue_ = runningTaskQueue; TaskProcessing = ""; ownerPodId_ = LocalIpFinder.LocalIpv4Address(); ownerPodName_ = Dns.GetHostName(); @@ -265,24 +269,37 @@ void RecordError(Exception e) logger_.LogDebug("Start a new Task to process the messageHandler"); - try + while (runningTaskQueue_.RemoveException(out var exception)) { - await using var taskHandler = new TaskHandler(sessionTable_, - taskTable_, - resultTable_, - submitter_, - dataPrefetcher_, - workerStreamHandler_, - message, - taskProcessingChecker_, - ownerPodId_, - ownerPodName_, - activitySource_, - agentHandler_, - logger_, - pollsterOptions_, - cts); + if (exception is RpcException rpcException && TaskHandler.IsStatusFatal(rpcException.StatusCode)) + { + // This exception should stop pollster + Console.WriteLine(exception); + ExceptionDispatchInfo.Capture(exception) + .Throw(); + } + RecordError(exception); + } + + var taskHandler = new TaskHandler(sessionTable_, + taskTable_, + resultTable_, + submitter_, + dataPrefetcher_, + workerStreamHandler_, + message, + taskProcessingChecker_, + ownerPodId_, + ownerPodName_, + activitySource_, + agentHandler_, + logger_, + pollsterOptions_, + cts); + + try + { StopCancelledTask = taskHandler.StopCancelledTask; var precondition = await taskHandler.AcquireTask() @@ -293,11 +310,13 @@ void RecordError(Exception e) await taskHandler.PreProcessing() .ConfigureAwait(false); - await taskHandler.ExecuteTask() - .ConfigureAwait(false); + await runningTaskQueue_.WriteAsync(taskHandler, + cancellationToken) + .ConfigureAwait(false); - await taskHandler.PostProcessing() - .ConfigureAwait(false); + await runningTaskQueue_.WaitForNextWriteAsync(TimeSpan.FromMinutes(1), + cancellationToken) + .ConfigureAwait(false); StopCancelledTask = null; @@ -307,6 +326,11 @@ await taskHandler.PostProcessing() recordedErrors.Dequeue(); } } + else + { + await taskHandler.DisposeAsync() + .ConfigureAwait(false); + } } catch (RpcException e) when (TaskHandler.IsStatusFatal(e.StatusCode)) { diff --git a/Common/src/Pollster/PostProcessingTaskQueue.cs b/Common/src/Pollster/PostProcessingTaskQueue.cs new file mode 100644 index 000000000..e6d77cc51 --- /dev/null +++ b/Common/src/Pollster/PostProcessingTaskQueue.cs @@ -0,0 +1,62 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace ArmoniK.Core.Common.Pollster; + +public class PostProcessingTaskQueue +{ + private readonly Channel channel_ = Channel.CreateBounded(new BoundedChannelOptions(1) + { + Capacity = 1, + FullMode = BoundedChannelFullMode.Wait, + SingleReader = true, + SingleWriter = true, + }); + + private readonly Queue exceptions_ = new(); + + public async Task WriteAsync(TaskHandler handler, + CancellationToken cancellationToken) + => await channel_.Writer.WriteAsync(handler, + cancellationToken) + .ConfigureAwait(false); + + public async Task ReadAsync(CancellationToken cancellationToken) + => await channel_.Reader.ReadAsync(cancellationToken) + .ConfigureAwait(false); + + public void AddException(Exception e) + => exceptions_.Enqueue(e); + + public bool RemoveException([MaybeNullWhen(false)] out Exception e) + { + var r = exceptions_.Count > 0; + + e = r + ? exceptions_.Dequeue() + : null; + + return r; + } +} diff --git a/Common/src/Pollster/PostProcessor.cs b/Common/src/Pollster/PostProcessor.cs new file mode 100644 index 000000000..b207d0939 --- /dev/null +++ b/Common/src/Pollster/PostProcessor.cs @@ -0,0 +1,60 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Tasks; + +using Microsoft.Extensions.Hosting; + +namespace ArmoniK.Core.Common.Pollster; + +public class PostProcessor : BackgroundService +{ + private readonly PostProcessingTaskQueue postProcessingTaskQueue_; + public string CurrentTask = string.Empty; + + public PostProcessor(PostProcessingTaskQueue postProcessingTaskQueue) + => postProcessingTaskQueue_ = postProcessingTaskQueue; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + var taskHandler = await postProcessingTaskQueue_.ReadAsync(stoppingToken) + .ConfigureAwait(false); + try + { + CurrentTask = taskHandler.GetAcquiredTask(); + await taskHandler.PostProcessing() + .ConfigureAwait(false); + } + catch (Exception e) + { + postProcessingTaskQueue_.AddException(ExceptionDispatchInfo.Capture(e) + .SourceException); + } + finally + { + await taskHandler.DisposeAsync() + .ConfigureAwait(false); + CurrentTask = string.Empty; + } + } + } +} diff --git a/Common/src/Pollster/RunningTaskProcessor.cs b/Common/src/Pollster/RunningTaskProcessor.cs new file mode 100644 index 000000000..0b0a46ec3 --- /dev/null +++ b/Common/src/Pollster/RunningTaskProcessor.cs @@ -0,0 +1,76 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Tasks; + +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace ArmoniK.Core.Common.Pollster; + +public class RunningTaskProcessor : BackgroundService +{ + private readonly ILogger logger_; + private readonly PostProcessingTaskQueue postProcessingTaskQueue_; + private readonly RunningTaskQueue runningTaskQueue_; + public string CurrentTask = string.Empty; + + public RunningTaskProcessor(RunningTaskQueue runningTaskQueue, + PostProcessingTaskQueue postProcessingTaskQueue, + ILogger logger) + { + runningTaskQueue_ = runningTaskQueue; + postProcessingTaskQueue_ = postProcessingTaskQueue; + logger_ = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + logger_.LogDebug("Start running task processing service"); + while (!stoppingToken.IsCancellationRequested) + { + try + { + while (postProcessingTaskQueue_.RemoveException(out var exception)) + { + runningTaskQueue_.AddException(exception); + } + + var taskHandler = await runningTaskQueue_.ReadAsync(stoppingToken) + .ConfigureAwait(false); + CurrentTask = taskHandler.GetAcquiredTask(); + await taskHandler.ExecuteTask() + .ConfigureAwait(false); + await postProcessingTaskQueue_.WriteAsync(taskHandler, + stoppingToken) + .ConfigureAwait(false); + } + catch (Exception e) + { + runningTaskQueue_.AddException(ExceptionDispatchInfo.Capture(e) + .SourceException); + } + finally + { + CurrentTask = string.Empty; + } + } + } +} diff --git a/Common/src/Pollster/RunningTaskQueue.cs b/Common/src/Pollster/RunningTaskQueue.cs new file mode 100644 index 000000000..58c0a83fc --- /dev/null +++ b/Common/src/Pollster/RunningTaskQueue.cs @@ -0,0 +1,78 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace ArmoniK.Core.Common.Pollster; + +public class RunningTaskQueue +{ + private readonly Channel channel_ = Channel.CreateBounded(new BoundedChannelOptions(1) + { + Capacity = 1, + FullMode = BoundedChannelFullMode.Wait, + SingleReader = false, + SingleWriter = true, + }); + + private readonly Queue exceptions_ = new(); + + public async Task WriteAsync(TaskHandler handler, + CancellationToken cancellationToken) + => await channel_.Writer.WriteAsync(handler, + cancellationToken) + .ConfigureAwait(false); + + public async Task WaitForNextWriteAsync(TimeSpan timeout, + CancellationToken cancellationToken) + { + var cts = new CancellationTokenSource(timeout); + await using var registration = cancellationToken.Register(() => cts.Cancel()); + + await channel_.Writer.WaitToWriteAsync(cts.Token) + .ConfigureAwait(false); + + if (channel_.Reader.TryRead(out var handler)) + { + await handler.DisposeAsync() + .ConfigureAwait(false); + } + } + + public async Task ReadAsync(CancellationToken cancellationToken) + => await channel_.Reader.ReadAsync(cancellationToken) + .ConfigureAwait(false); + + public void AddException(Exception e) + => exceptions_.Enqueue(e); + + public bool RemoveException([MaybeNullWhen(false)] out Exception e) + { + var r = exceptions_.Count > 0; + + e = r + ? exceptions_.Dequeue() + : null; + + return r; + } +} diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 6d58fc8e7..fb87246b5 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -62,6 +62,7 @@ public sealed class TaskHandler : IAsyncDisposable private readonly IWorkerStreamHandler workerStreamHandler_; private IAgent? agent_; private Queue? computeRequestStream_; + private ProcessReply? reply_; private SessionData? sessionData_; private TaskData? taskData_; @@ -506,6 +507,17 @@ await taskTable_.StartTask(taskData_, workerStreamHandler_.StartTaskProcessing(taskData_, workerConnectionCts_.Token); + } + catch (Exception e) + { + await HandleErrorRequeueAsync(e, + taskData_, + cancellationTokenSource_.Token) + .ConfigureAwait(false); + } + + try + { if (workerStreamHandler_.Pipe is null) { throw new ArmoniKException($"{nameof(IWorkerStreamHandler.Pipe)} should not be null"); @@ -523,12 +535,21 @@ await workerStreamHandler_.Pipe.WriteAsync(new ProcessRequest await workerStreamHandler_.Pipe.CompleteAsync() .ConfigureAwait(false); + + // at this point worker requests should have ended + logger_.LogDebug("Wait for task output"); + reply_ = await workerStreamHandler_.Pipe.ReadAsync(workerConnectionCts_.Token) + .ConfigureAwait(false); + + logger_.LogDebug("Stop agent server"); + await agentHandler_.Stop(workerConnectionCts_.Token) + .ConfigureAwait(false); } catch (Exception e) { - await HandleErrorRequeueAsync(e, - taskData_, - cancellationTokenSource_.Token) + await HandleErrorResubmitAsync(e, + taskData_, + cancellationTokenSource_.Token) .ConfigureAwait(false); } } @@ -557,25 +578,21 @@ public async Task PostProcessing() throw new NullReferenceException(nameof(agent_) + " is null."); } + if (reply_ is null) + { + throw new NullReferenceException(nameof(reply_) + " is null."); + } + using var _ = logger_.BeginNamedScope("PostProcessing", ("taskId", messageHandler_.TaskId), ("sessionId", taskData_.SessionId)); try { - // at this point worker requests should have ended - logger_.LogDebug("Wait for task output"); - var reply = await workerStreamHandler_.Pipe.ReadAsync(workerConnectionCts_.Token) - .ConfigureAwait(false); - - logger_.LogDebug("Stop agent server"); - await agentHandler_.Stop(workerConnectionCts_.Token) - .ConfigureAwait(false); - logger_.LogInformation("Process task output of type {type}", - reply.Output.TypeCase); + reply_.Output.TypeCase); - if (reply.Output.TypeCase is Output.TypeOneofCase.Ok) + if (reply_.Output.TypeCase is Output.TypeOneofCase.Ok) { logger_.LogDebug("Complete processing of the request"); await agent_.FinalizeTaskCreation(CancellationToken.None) @@ -584,7 +601,7 @@ await agent_.FinalizeTaskCreation(CancellationToken.None) await submitter_.CompleteTaskAsync(taskData_, false, - reply.Output, + reply_.Output, CancellationToken.None) .ConfigureAwait(false); messageHandler_.Status = QueueMessageStatus.Processed; diff --git a/Common/tests/Helpers/SimplePullQueueStorageChannel.cs b/Common/tests/Helpers/SimplePullQueueStorageChannel.cs new file mode 100644 index 000000000..83c5b5949 --- /dev/null +++ b/Common/tests/Helpers/SimplePullQueueStorageChannel.cs @@ -0,0 +1,73 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +using ArmoniK.Core.Base; +using ArmoniK.Core.Base.DataStructures; + +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace ArmoniK.Core.Common.Tests.Helpers; + +public class SimplePullQueueStorageChannel : IPullQueueStorage +{ + public readonly Channel Channel = System.Threading.Channels.Channel.CreateUnbounded(); + + public Task Check(HealthCheckTag tag) + => Task.FromResult(HealthCheckResult.Healthy()); + + public Task Init(CancellationToken cancellationToken) + => Task.CompletedTask; + + public int MaxPriority + => 10; + + + public async IAsyncEnumerable PullMessagesAsync(int nbMessages, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + foreach (var _ in Enumerable.Range(0, + nbMessages)) + { + if (cancellationToken.IsCancellationRequested) + { + yield break; + } + + IQueueMessageHandler? msg; + + try + { + msg = await Channel.Reader.ReadAsync(cancellationToken) + .ConfigureAwait(false); + } + catch (OperationCanceledException) + { + yield break; + } + + yield return msg; + } + } +} diff --git a/Common/tests/Helpers/TestPollsterProvider.cs b/Common/tests/Helpers/TestPollsterProvider.cs index 4e478c994..0f9e31eec 100644 --- a/Common/tests/Helpers/TestPollsterProvider.cs +++ b/Common/tests/Helpers/TestPollsterProvider.cs @@ -36,6 +36,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -123,12 +124,17 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, NullLogger.Instance) .AddSingleton(ActivitySource) .AddSingleton(_ => client_) + .AddLogging() .AddSingleton() .AddOption(builder.Configuration, Injection.Options.Submitter.SettingSection) .AddSingleton() .AddSingleton("ownerpodid") .AddSingleton() + .AddHostedService() + .AddHostedService() + .AddSingleton() + .AddSingleton() .AddSingleton() .AddSingleton() .AddOption(builder.Configuration, @@ -141,6 +147,7 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, builder.Services.AddSingleton(computePlanOptions); app_ = builder.Build(); + app_.Start(); ResultTable = app_.Services.GetRequiredService(); TaskTable = app_.Services.GetRequiredService(); @@ -164,6 +171,8 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, public void Dispose() { + app_.StopAsync() + .Wait(); ((IDisposable)app_)?.Dispose(); runner_?.Dispose(); GC.SuppressFinalize(this); diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index be9da9a37..4fd027019 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -468,13 +468,13 @@ public Task CompleteAsync() [TestCase(5000)] // task should be longer than the grace delay public async Task ExecuteTaskShouldSucceed(double delay) { - var mockPullQueueStorage = new Mock(); + var mockPullQueueStorage = new SimplePullQueueStorageChannel(); var waitWorkerStreamHandler = new WaitWorkerStreamHandler(delay); var simpleAgentHandler = new SimpleAgentHandler(); using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler, simpleAgentHandler, - mockPullQueueStorage.Object); + mockPullQueueStorage); var (_, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter, testServiceProvider.PartitionTable, @@ -482,19 +482,15 @@ public async Task ExecuteTaskShouldSucceed(double delay) CancellationToken.None) .ConfigureAwait(false); - mockPullQueueStorage.Setup(storage => storage.PullMessagesAsync(It.IsAny(), - It.IsAny())) - .Returns(() => new List - { - new SimpleQueueMessageHandler - { - CancellationToken = CancellationToken.None, - Status = QueueMessageStatus.Waiting, - MessageId = Guid.NewGuid() - .ToString(), - TaskId = taskSubmitted, - }, - }.ToAsyncEnumerable()); + await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler + { + CancellationToken = CancellationToken.None, + Status = QueueMessageStatus.Waiting, + MessageId = Guid.NewGuid() + .ToString(), + TaskId = taskSubmitted, + }) + .ConfigureAwait(false); await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); @@ -505,7 +501,9 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) Assert.False(testServiceProvider.Pollster.Failed); Assert.True(source.Token.IsCancellationRequested); - Assert.AreEqual(TaskStatus.Completed, + Assert.AreEqual(delay < 1000 + ? TaskStatus.Completed + : TaskStatus.Processing, (await testServiceProvider.TaskTable.GetTaskStatus(new[] { taskSubmitted, @@ -579,14 +577,15 @@ await Task.Delay(TimeSpan.FromMilliseconds(200), Assert.False(testServiceProvider.Pollster.Failed); Assert.True(source.Token.IsCancellationRequested); - Assert.AreEqual(TaskStatus.Cancelled, - (await testServiceProvider.TaskTable.GetTaskStatus(new[] - { - taskSubmitted, - }, - CancellationToken.None) - .ConfigureAwait(false)).Single() - .Status); + Assert.That((await testServiceProvider.TaskTable.GetTaskStatus(new[] + { + taskSubmitted, + }, + CancellationToken.None) + .ConfigureAwait(false)).Single() + .Status, + Is.AnyOf(TaskStatus.Cancelled, + TaskStatus.Cancelling)); Assert.AreEqual(string.Empty, testServiceProvider.Pollster.TaskProcessing); Assert.AreSame(string.Empty, @@ -672,7 +671,7 @@ await pollster.Init(CancellationToken.None) [Test] public async Task UnavailableWorkerShouldFail() { - var mockPullQueueStorage = new Mock(); + var mockPullQueueStorage = new SimplePullQueueStorageChannel(); var simpleAgentHandler = new SimpleAgentHandler(); var mockStreamHandlerFail = new Mock(); @@ -683,7 +682,7 @@ public async Task UnavailableWorkerShouldFail() using var testServiceProvider = new TestPollsterProvider(mockStreamHandlerFail.Object, simpleAgentHandler, - mockPullQueueStorage.Object); + mockPullQueueStorage); var (_, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter, testServiceProvider.PartitionTable, @@ -691,19 +690,15 @@ public async Task UnavailableWorkerShouldFail() CancellationToken.None) .ConfigureAwait(false); - mockPullQueueStorage.Setup(storage => storage.PullMessagesAsync(It.IsAny(), - It.IsAny())) - .Returns(() => new List - { - new SimpleQueueMessageHandler - { - CancellationToken = CancellationToken.None, - Status = QueueMessageStatus.Waiting, - MessageId = Guid.NewGuid() - .ToString(), - TaskId = taskSubmitted, - }, - }.ToAsyncEnumerable()); + await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler + { + CancellationToken = CancellationToken.None, + Status = QueueMessageStatus.Waiting, + MessageId = Guid.NewGuid() + .ToString(), + TaskId = taskSubmitted, + }) + .ConfigureAwait(false); await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); @@ -711,8 +706,6 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token)); - Assert.True(testServiceProvider.Pollster.Failed); - Assert.False(source.Token.IsCancellationRequested); Assert.AreEqual(TaskStatus.Submitted, (await testServiceProvider.TaskTable.GetTaskStatus(new[] diff --git a/Common/tests/Pollster/TaskHandlerTest.cs b/Common/tests/Pollster/TaskHandlerTest.cs index 967dea8b4..338378612 100644 --- a/Common/tests/Pollster/TaskHandlerTest.cs +++ b/Common/tests/Pollster/TaskHandlerTest.cs @@ -1116,12 +1116,10 @@ public static IEnumerable TestCaseOuptut await testServiceProvider.TaskHandler.PreProcessing() .ConfigureAwait(false); - await testServiceProvider.TaskHandler.ExecuteTask() - .ConfigureAwait(false); - cancellationTokenSource.CancelAfter(TimeSpan.FromMilliseconds(1500)); - Assert.ThrowsAsync(() => testServiceProvider.TaskHandler.PostProcessing()); + Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.ExecuteTask() + .ConfigureAwait(false)); return ((await testServiceProvider.TaskTable.GetTaskStatus(new[] { @@ -1230,11 +1228,15 @@ public async Task ExecuteTaskUntilErrorShouldSucceed() await testServiceProvider.TaskHandler.PreProcessing() .ConfigureAwait(false); - await testServiceProvider.TaskHandler.ExecuteTask() - .ConfigureAwait(false); - Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.PostProcessing() - .ConfigureAwait(false)); + Assert.ThrowsAsync(async () => + { + await testServiceProvider.TaskHandler.ExecuteTask() + .ConfigureAwait(false); + + await testServiceProvider.TaskHandler.PostProcessing() + .ConfigureAwait(false); + }); taskData = await testServiceProvider.TaskTable.ReadTaskAsync(taskId, @@ -1376,10 +1378,7 @@ public async Task ExecuteTaskWithErrorDuringExecutionInWorkerHandlerShouldThrow< await testServiceProvider.TaskHandler.PreProcessing() .ConfigureAwait(false); - await testServiceProvider.TaskHandler.ExecuteTask() - .ConfigureAwait(false); - - Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.PostProcessing() + Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.ExecuteTask() .ConfigureAwait(false)); var taskData = await testServiceProvider.TaskTable.ReadTaskAsync(taskId, @@ -1501,8 +1500,8 @@ public async Task CancelLongTaskShouldSucceed() await testServiceProvider.TaskHandler.PreProcessing() .ConfigureAwait(false); - await testServiceProvider.TaskHandler.ExecuteTask() - .ConfigureAwait(false); + var exec = testServiceProvider.TaskHandler.ExecuteTask() + .ConfigureAwait(false); // Cancel task for test @@ -1529,8 +1528,7 @@ await testServiceProvider.TaskHandler.StopCancelledTask() await testServiceProvider.TaskHandler.StopCancelledTask() .ConfigureAwait(false); - Assert.That(testServiceProvider.TaskHandler.PostProcessing, - Throws.InstanceOf()); + Assert.ThrowsAsync(async () => await exec); Assert.AreEqual(TaskStatus.Cancelling, (await testServiceProvider.TaskTable.GetTaskStatus(new[] diff --git a/Compute/PollingAgent/src/Program.cs b/Compute/PollingAgent/src/Program.cs index ac828ca04..bb8f8356b 100644 --- a/Compute/PollingAgent/src/Program.cs +++ b/Compute/PollingAgent/src/Program.cs @@ -96,6 +96,10 @@ public static async Task Main(string[] args) .AddLocalStorage(builder.Configuration, logger.GetLogger()) .AddHostedService() + .AddHostedService() + .AddHostedService() + .AddSingleton() + .AddSingleton() .AddSingletonWithHealthCheck(nameof(Common.Pollster.Pollster)) .AddSingleton(logger) .AddSingleton() From e5104483621751e263a44924ddf04507f39941d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 4 Sep 2023 15:05:11 +0200 Subject: [PATCH 02/24] fix: properly manage task handler lifecycle in RunningTaskProcessor --- Common/src/Pollster/RunningTaskProcessor.cs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/Common/src/Pollster/RunningTaskProcessor.cs b/Common/src/Pollster/RunningTaskProcessor.cs index 0b0a46ec3..c8ca42e9a 100644 --- a/Common/src/Pollster/RunningTaskProcessor.cs +++ b/Common/src/Pollster/RunningTaskProcessor.cs @@ -55,12 +55,21 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) var taskHandler = await runningTaskQueue_.ReadAsync(stoppingToken) .ConfigureAwait(false); - CurrentTask = taskHandler.GetAcquiredTask(); - await taskHandler.ExecuteTask() - .ConfigureAwait(false); - await postProcessingTaskQueue_.WriteAsync(taskHandler, - stoppingToken) - .ConfigureAwait(false); + try + { + CurrentTask = taskHandler.GetAcquiredTask(); + await taskHandler.ExecuteTask() + .ConfigureAwait(false); + await postProcessingTaskQueue_.WriteAsync(taskHandler, + stoppingToken) + .ConfigureAwait(false); + } + catch (Exception) + { + await taskHandler.DisposeAsync() + .ConfigureAwait(false); + throw; + } } catch (Exception e) { From 5b9e4dbbfc3b08465a29ef99c2da450a69187298 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 4 Sep 2023 21:01:23 +0200 Subject: [PATCH 03/24] refactor: task deduplication and cancellation adapted to task pipelining --- Common/src/Pollster/Pollster.cs | 73 ++++++++++--------- Common/src/Pollster/TaskHandler.cs | 4 + .../TaskProcessingCheckerClient.cs | 4 +- .../tests/Helpers/TestTaskHandlerProvider.cs | 26 +++++-- Common/tests/Pollster/PollsterTest.cs | 24 ++---- Compute/PollingAgent/src/Program.cs | 12 ++- 6 files changed, 76 insertions(+), 67 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index 729c68759..af7e49e63 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -45,28 +45,28 @@ namespace ArmoniK.Core.Common.Pollster; public class Pollster : IInitializable { - private readonly ActivitySource activitySource_; - private readonly IAgentHandler agentHandler_; - private readonly DataPrefetcher dataPrefetcher_; - private readonly IHostApplicationLifetime lifeTime_; - private readonly ILogger logger_; - private readonly int messageBatchSize_; - private readonly IObjectStorage objectStorage_; - private readonly string ownerPodId_; - private readonly string ownerPodName_; - private readonly Injection.Options.Pollster pollsterOptions_; - private readonly IPullQueueStorage pullQueueStorage_; - private readonly IResultTable resultTable_; - private readonly RunningTaskQueue runningTaskQueue_; - private readonly ISessionTable sessionTable_; - private readonly ISubmitter submitter_; - private readonly ITaskProcessingChecker taskProcessingChecker_; - private readonly ITaskTable taskTable_; - private readonly IWorkerStreamHandler workerStreamHandler_; - private bool endLoopReached_; - private HealthCheckResult? healthCheckFailedResult_; - public Func? StopCancelledTask; - public string TaskProcessing; + private readonly ActivitySource activitySource_; + private readonly IAgentHandler agentHandler_; + private readonly DataPrefetcher dataPrefetcher_; + private readonly IHostApplicationLifetime lifeTime_; + private readonly ILogger logger_; + private readonly int messageBatchSize_; + private readonly IObjectStorage objectStorage_; + private readonly string ownerPodId_; + private readonly string ownerPodName_; + private readonly Injection.Options.Pollster pollsterOptions_; + private readonly IPullQueueStorage pullQueueStorage_; + private readonly IResultTable resultTable_; + private readonly RunningTaskQueue runningTaskQueue_; + private readonly ISessionTable sessionTable_; + private readonly ISubmitter submitter_; + private readonly ITaskProcessingChecker taskProcessingChecker_; + private readonly Dictionary taskProcessingDict_ = new(); + private readonly ITaskTable taskTable_; + private readonly IWorkerStreamHandler workerStreamHandler_; + private bool endLoopReached_; + private HealthCheckResult? healthCheckFailedResult_; + public Pollster(IPullQueueStorage pullQueueStorage, DataPrefetcher dataPrefetcher, @@ -107,12 +107,24 @@ public Pollster(IPullQueueStorage pullQueueStorage, workerStreamHandler_ = workerStreamHandler; agentHandler_ = agentHandler; runningTaskQueue_ = runningTaskQueue; - TaskProcessing = ""; ownerPodId_ = LocalIpFinder.LocalIpv4Address(); ownerPodName_ = Dns.GetHostName(); Failed = false; } + public Func StopCancelledTask + => async () => + { + foreach (var taskHandler in taskProcessingDict_) + { + await taskHandler.Value.StopCancelledTask() + .ConfigureAwait(false); + } + }; + + public ICollection TaskProcessing + => taskProcessingDict_.Keys; + /// /// Is true when the MainLoop exited with an error /// Used in Unit tests @@ -259,7 +271,7 @@ void RecordError(Exception e) ("messageHandler", message.MessageId), ("taskId", message.TaskId), ("ownerPodId", ownerPodId_)); - TaskProcessing = message.TaskId; + // ReSharper disable once ExplicitCallerInfoArgument using var activity = activitySource_.StartActivity("ProcessQueueMessage"); activity?.SetBaggage("TaskId", @@ -296,12 +308,14 @@ void RecordError(Exception e) agentHandler_, logger_, pollsterOptions_, + () => taskProcessingDict_.Remove(message.TaskId), cts); + taskProcessingDict_.TryAdd(message.TaskId, + taskHandler); + try { - StopCancelledTask = taskHandler.StopCancelledTask; - var precondition = await taskHandler.AcquireTask() .ConfigureAwait(false); @@ -318,8 +332,6 @@ await runningTaskQueue_.WaitForNextWriteAsync(TimeSpan.FromMinutes(1), cancellationToken) .ConfigureAwait(false); - StopCancelledTask = null; - // If the task was successful, we can remove a failure if (recordedErrors.Count > 0) { @@ -341,11 +353,6 @@ await taskHandler.DisposeAsync() { RecordError(e); } - finally - { - StopCancelledTask = null; - TaskProcessing = string.Empty; - } } } catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable) diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index fb87246b5..8acab8b62 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -49,6 +49,7 @@ public sealed class TaskHandler : IAsyncDisposable private readonly DataPrefetcher dataPrefetcher_; private readonly ILogger logger_; private readonly IQueueMessageHandler messageHandler_; + private readonly Action onDispose_; private readonly string ownerPodId_; private readonly string ownerPodName_; private readonly CancellationTokenRegistration reg1_; @@ -80,6 +81,7 @@ public TaskHandler(ISessionTable sessionTable, IAgentHandler agentHandler, ILogger logger, Injection.Options.Pollster pollsterOptions, + Action onDispose, CancellationTokenSource cancellationTokenSource) { sessionTable_ = sessionTable; @@ -93,6 +95,7 @@ public TaskHandler(ISessionTable sessionTable, activitySource_ = activitySource; agentHandler_ = agentHandler; logger_ = logger; + onDispose_ = onDispose; ownerPodId_ = ownerPodId; ownerPodName_ = ownerPodName; taskData_ = null; @@ -122,6 +125,7 @@ public async ValueTask DisposeAsync() ("taskId", messageHandler_.TaskId), ("sessionId", taskData_?.SessionId ?? "")); + onDispose_.Invoke(); logger_.LogDebug("MessageHandler status is {status}", messageHandler_.Status); await messageHandler_.DisposeAsync() diff --git a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs index af2f3f5f3..180274e2e 100644 --- a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs +++ b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs @@ -16,6 +16,7 @@ // along with this program. If not, see . using System; +using System.Linq; using System.Net.Http; using System.Threading; using System.Threading.Tasks; @@ -50,7 +51,8 @@ public async Task Check(string taskId, .ConfigureAwait(false); logger_.LogDebug("Result from other polling agent: {result}", result); - return result.Equals(taskId); + return result.Split(",") + .Contains(taskId); } catch (InvalidOperationException ex) { diff --git a/Common/tests/Helpers/TestTaskHandlerProvider.cs b/Common/tests/Helpers/TestTaskHandlerProvider.cs index d0521bee9..077c3ba67 100644 --- a/Common/tests/Helpers/TestTaskHandlerProvider.cs +++ b/Common/tests/Helpers/TestTaskHandlerProvider.cs @@ -135,15 +135,27 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler, Injection.Options.Submitter.SettingSection) .AddOption(builder.Configuration, Injection.Options.Pollster.SettingSection) - .AddSingleton(cancellationTokenSource) .AddSingleton() - .AddSingleton("ownerpodid") - .AddSingleton() + .AddSingleton(provider => new TaskHandler(provider.GetRequiredService(), + provider.GetRequiredService(), + provider.GetRequiredService(), + provider.GetRequiredService(), + provider.GetRequiredService(), + workerStreamHandler, + queueStorage, + provider.GetRequiredService(), + "ownerpodid", + "ownerpodname", + provider.GetRequiredService(), + agentHandler, + provider.GetRequiredService(), + provider.GetRequiredService(), + () => + { + }, + cancellationTokenSource)) .AddSingleton() - .AddSingleton() - .AddSingleton(workerStreamHandler) - .AddSingleton(agentHandler) - .AddSingleton(queueStorage); + .AddSingleton(); if (inputTaskTable is not null) { diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index 4fd027019..416b92c67 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -402,10 +402,8 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token)); Assert.True(source.Token.IsCancellationRequested); - Assert.AreEqual(string.Empty, + Assert.AreEqual(Array.Empty(), testServiceProvider.Pollster.TaskProcessing); - Assert.AreSame(string.Empty, - testServiceProvider.Pollster.TaskProcessing); } public class WaitWorkerStreamHandler : IWorkerStreamHandler @@ -511,10 +509,6 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) CancellationToken.None) .ConfigureAwait(false)).Single() .Status); - Assert.AreEqual(string.Empty, - testServiceProvider.Pollster.TaskProcessing); - Assert.AreSame(string.Empty, - testServiceProvider.Pollster.TaskProcessing); } [Test] @@ -570,7 +564,7 @@ await Task.Delay(TimeSpan.FromMilliseconds(200), CancellationToken.None) .ConfigureAwait(false); - await testServiceProvider.Pollster.StopCancelledTask!.Invoke() + await testServiceProvider.Pollster.StopCancelledTask.Invoke() .ConfigureAwait(false); Assert.DoesNotThrowAsync(() => mainLoopTask); @@ -586,10 +580,6 @@ await Task.Delay(TimeSpan.FromMilliseconds(200), .Status, Is.AnyOf(TaskStatus.Cancelled, TaskStatus.Cancelling)); - Assert.AreEqual(string.Empty, - testServiceProvider.Pollster.TaskProcessing); - Assert.AreSame(string.Empty, - testServiceProvider.Pollster.TaskProcessing); } public static IEnumerable ExecuteTooManyErrorShouldFailTestCase @@ -661,10 +651,8 @@ await pollster.Init(CancellationToken.None) Assert.DoesNotThrowAsync(() => pollster.MainLoop(source.Token)); Assert.True(pollster.Failed); Assert.False(source.Token.IsCancellationRequested); - Assert.AreEqual(string.Empty, - pollster.TaskProcessing); - Assert.AreSame(string.Empty, - pollster.TaskProcessing); + Assert.AreEqual(Array.Empty(), + testServiceProvider.Pollster.TaskProcessing); } @@ -715,9 +703,7 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) CancellationToken.None) .ConfigureAwait(false)).Single() .Status); - Assert.AreEqual(string.Empty, + Assert.AreEqual(Array.Empty(), testServiceProvider.Pollster.TaskProcessing); - Assert.AreSame(string.Empty, - testServiceProvider.Pollster.TaskProcessing); } } diff --git a/Compute/PollingAgent/src/Program.cs b/Compute/PollingAgent/src/Program.cs index bb8f8356b..a329ab9ea 100644 --- a/Compute/PollingAgent/src/Program.cs +++ b/Compute/PollingAgent/src/Program.cs @@ -186,19 +186,17 @@ public static async Task Main(string[] args) }); endpoints.MapGet("/taskprocessing", - () => Task.FromResult(app.Services.GetRequiredService() - .TaskProcessing)); + () => Task.FromResult(string.Join(",", + app.Services.GetRequiredService() + .TaskProcessing))); endpoints.MapGet("/stopcancelledtask", async () => { var stopCancelledTask = app.Services.GetRequiredService() .StopCancelledTask; - if (stopCancelledTask != null) - { - await stopCancelledTask.Invoke() - .ConfigureAwait(false); - } + await stopCancelledTask.Invoke() + .ConfigureAwait(false); }); }); From 2fd662082a012942b45b264366fb0dca132d09b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 4 Sep 2023 22:05:57 +0200 Subject: [PATCH 04/24] tests: more robust CancelLongTaskShouldSucceed --- Common/tests/Pollster/PollsterTest.cs | 29 +++++++++++++-------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index 416b92c67..ffa0216fa 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -514,13 +514,13 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) [Test] public async Task CancelLongTaskShouldSucceed() { - var mockPullQueueStorage = new Mock(); + var mockPullQueueStorage = new SimplePullQueueStorageChannel(); var waitWorkerStreamHandler = new ExceptionWorkerStreamHandler(15000); var simpleAgentHandler = new SimpleAgentHandler(); using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler, simpleAgentHandler, - mockPullQueueStorage.Object); + mockPullQueueStorage); var (_, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter, testServiceProvider.PartitionTable, @@ -528,19 +528,15 @@ public async Task CancelLongTaskShouldSucceed() CancellationToken.None) .ConfigureAwait(false); - mockPullQueueStorage.Setup(storage => storage.PullMessagesAsync(It.IsAny(), - It.IsAny())) - .Returns(() => new List - { - new SimpleQueueMessageHandler - { - CancellationToken = CancellationToken.None, - Status = QueueMessageStatus.Waiting, - MessageId = Guid.NewGuid() - .ToString(), - TaskId = taskSubmitted, - }, - }.ToAsyncEnumerable()); + await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler + { + CancellationToken = CancellationToken.None, + Status = QueueMessageStatus.Waiting, + MessageId = Guid.NewGuid() + .ToString(), + TaskId = taskSubmitted, + }) + .ConfigureAwait(false); await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); @@ -580,6 +576,9 @@ await testServiceProvider.Pollster.StopCancelledTask.Invoke() .Status, Is.AnyOf(TaskStatus.Cancelled, TaskStatus.Cancelling)); + + Assert.AreEqual(Array.Empty(), + testServiceProvider.Pollster.TaskProcessing); } public static IEnumerable ExecuteTooManyErrorShouldFailTestCase From 69412ab35aa016e5ae8ddd85edd3aa4dbaf6bda9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 4 Sep 2023 22:41:52 +0200 Subject: [PATCH 05/24] fix: manage tasks already in a final state during acquisition --- Adaptors/Memory/src/TaskTable.cs | 20 ++++++--- Adaptors/MongoDB/src/TaskTable.cs | 18 ++++---- .../TaskAlreadyInFinalStateException.cs | 40 ++++++++++++++++++ Common/src/Pollster/TaskHandler.cs | 22 +++++----- Common/tests/TestBase/TaskTableTestBase.cs | 42 +++++++++++++++++++ 5 files changed, 118 insertions(+), 24 deletions(-) create mode 100644 Common/src/Exceptions/TaskAlreadyInFinalStateException.cs diff --git a/Adaptors/Memory/src/TaskTable.cs b/Adaptors/Memory/src/TaskTable.cs index e3f1facef..7cc8bad27 100644 --- a/Adaptors/Memory/src/TaskTable.cs +++ b/Adaptors/Memory/src/TaskTable.cs @@ -110,12 +110,20 @@ public Task StartTask(TaskData taskData, taskId2TaskData_.AddOrUpdate(taskData.TaskId, _ => throw new TaskNotFoundException($"Key '{taskData.TaskId}' not found"), (_, - data) => data with - { - Status = TaskStatus.Processing, - StartDate = taskData.StartDate, - PodTtl = taskData.PodTtl, - }); + data) => + { + if (data.Status is TaskStatus.Error or TaskStatus.Completed or TaskStatus.Retried or TaskStatus.Cancelled) + { + throw new TaskAlreadyInFinalStateException($"{taskData.TaskId} is already in a final state : {data.Status}"); + } + + return data with + { + Status = TaskStatus.Processing, + StartDate = taskData.StartDate, + PodTtl = taskData.PodTtl, + }; + }); return Task.CompletedTask; } diff --git a/Adaptors/MongoDB/src/TaskTable.cs b/Adaptors/MongoDB/src/TaskTable.cs index 8de871b35..65c0ba285 100644 --- a/Adaptors/MongoDB/src/TaskTable.cs +++ b/Adaptors/MongoDB/src/TaskTable.cs @@ -32,6 +32,7 @@ using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Exceptions; using ArmoniK.Core.Common.Storage; +using ArmoniK.Utils; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; @@ -146,7 +147,8 @@ public async Task StartTask(TaskData taskData, Logger.LogInformation("update task {taskId} to status {status}", taskData.TaskId, TaskStatus.Processing); - var res = await taskCollection.UpdateManyAsync(x => x.TaskId == taskData.TaskId && x.Status != TaskStatus.Completed && x.Status != TaskStatus.Cancelled, + var res = await taskCollection.UpdateManyAsync(x => x.TaskId == taskData.TaskId && x.Status != TaskStatus.Completed && x.Status != TaskStatus.Cancelled && + x.Status != TaskStatus.Error && x.Status != TaskStatus.Retried, updateDefinition, cancellationToken: cancellationToken) .ConfigureAwait(false); @@ -154,19 +156,19 @@ public async Task StartTask(TaskData taskData, switch (res.MatchedCount) { case 0: - var taskStatus = await GetTaskStatus(new[] - { - taskData.TaskId, - }, - cancellationToken) - .ConfigureAwait(false); + var taskStatus = (await GetTaskStatus(new[] + { + taskData.TaskId, + }, + cancellationToken) + .ConfigureAwait(false)).AsICollection(); if (!taskStatus.Any()) { throw new TaskNotFoundException($"Task {taskData.TaskId} not found"); } - throw new ArmoniKException($"Task already in a terminal state - {taskStatus.Single()} to {TaskStatus.Processing}"); + throw new TaskAlreadyInFinalStateException($"Task already in a terminal state - {taskStatus.Single()} to {TaskStatus.Processing}"); case > 1: throw new ArmoniKException("Multiple tasks modified"); } diff --git a/Common/src/Exceptions/TaskAlreadyInFinalStateException.cs b/Common/src/Exceptions/TaskAlreadyInFinalStateException.cs new file mode 100644 index 000000000..8c54e70fb --- /dev/null +++ b/Common/src/Exceptions/TaskAlreadyInFinalStateException.cs @@ -0,0 +1,40 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; + +namespace ArmoniK.Core.Common.Exceptions; + +[Serializable] +public class TaskAlreadyInFinalStateException : ArmoniKException +{ + public TaskAlreadyInFinalStateException() + { + } + + public TaskAlreadyInFinalStateException(string message) + : base(message) + { + } + + public TaskAlreadyInFinalStateException(string message, + Exception innerException) + : base(message, + innerException) + { + } +} diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 8acab8b62..86abf09cb 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -300,13 +300,6 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, return false; } - if (cancellationTokenSource_.IsCancellationRequested) - { - messageHandler_.Status = QueueMessageStatus.Postponed; - logger_.LogDebug("Dependencies resolved but execution cancellation requested"); - return false; - } - taskData_ = taskData_ with { OwnerPodId = ownerPodId_, @@ -314,9 +307,18 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, AcquisitionDate = DateTime.UtcNow, ReceptionDate = messageHandler_.ReceptionDateTime, }; - taskData_ = await taskTable_.AcquireTask(taskData_, - CancellationToken.None) - .ConfigureAwait(false); + try + { + taskData_ = await taskTable_.AcquireTask(taskData_, + CancellationToken.None) + .ConfigureAwait(false); + } + catch (TaskAlreadyInFinalStateException) + { + messageHandler_.Status = QueueMessageStatus.Processed; + logger_.LogDebug("Task already in a final state"); + return false; + } if (cancellationTokenSource_.IsCancellationRequested) { diff --git a/Common/tests/TestBase/TaskTableTestBase.cs b/Common/tests/TestBase/TaskTableTestBase.cs index ff6b0db59..69125bd1e 100644 --- a/Common/tests/TestBase/TaskTableTestBase.cs +++ b/Common/tests/TestBase/TaskTableTestBase.cs @@ -1167,6 +1167,48 @@ public void StartTaskShouldFail() } } + [Test] + [TestCase(TaskStatus.Completed)] + [TestCase(TaskStatus.Retried)] + [TestCase(TaskStatus.Error)] + [TestCase(TaskStatus.Cancelled)] + public async Task StartTaskInFinalStateShouldThrow(TaskStatus status) + { + if (RunTests) + { + var taskId = Guid.NewGuid() + .ToString(); + + await TaskTable!.CreateTasks(new[] + { + new TaskData("session", + taskId, + "owner", + "owner", + "payload", + new List(), + new List(), + new List(), + new List(), + status, + Options, + new Output(false, + "")), + }) + .ConfigureAwait(false); + + Assert.ThrowsAsync(async () => + { + await TaskTable!.StartTask(taskSubmittedData_ with + { + TaskId = taskId, + }, + CancellationToken.None) + .ConfigureAwait(false); + }); + } + } + [Test] public void DeleteTaskShouldFail() { From 8f481e7285fd86d5207020f2c088e43607ccc558 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 4 Sep 2023 23:31:47 +0200 Subject: [PATCH 06/24] fix: task acquisition when message is processed in parallel on the same agent --- Common/src/Pollster/Pollster.cs | 16 ++++++++++------ Common/src/Pollster/TaskHandler.cs | 22 ++++++++++------------ 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index af7e49e63..7e4d837d9 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -50,6 +50,7 @@ public class Pollster : IInitializable private readonly DataPrefetcher dataPrefetcher_; private readonly IHostApplicationLifetime lifeTime_; private readonly ILogger logger_; + private readonly ILoggerFactory loggerFactory_; private readonly int messageBatchSize_; private readonly IObjectStorage objectStorage_; private readonly string ownerPodId_; @@ -75,6 +76,7 @@ public Pollster(IPullQueueStorage pullQueueStorage, IHostApplicationLifetime lifeTime, ActivitySource activitySource, ILogger logger, + ILoggerFactory loggerFactory, IObjectStorage objectStorage, IResultTable resultTable, ISubmitter submitter, @@ -92,6 +94,7 @@ public Pollster(IPullQueueStorage pullQueueStorage, } logger_ = logger; + loggerFactory_ = loggerFactory; activitySource_ = activitySource; pullQueueStorage_ = pullQueueStorage; lifeTime_ = lifeTime; @@ -267,10 +270,11 @@ void RecordError(Exception e) await foreach (var message in messages.ConfigureAwait(false)) { - using var scopedLogger = logger_.BeginNamedScope("Prefetch messageHandler", - ("messageHandler", message.MessageId), - ("taskId", message.TaskId), - ("ownerPodId", ownerPodId_)); + var taskHandlerLogger = loggerFactory_.CreateLogger(); + taskHandlerLogger.BeginNamedScope("Prefetch messageHandler", + ("messageHandler", message.MessageId), + ("taskId", message.TaskId), + ("ownerPodId", ownerPodId_)); // ReSharper disable once ExplicitCallerInfoArgument using var activity = activitySource_.StartActivity("ProcessQueueMessage"); @@ -279,7 +283,7 @@ void RecordError(Exception e) activity?.SetBaggage("messageId", message.MessageId); - logger_.LogDebug("Start a new Task to process the messageHandler"); + taskHandlerLogger.LogDebug("Start a new Task to process the messageHandler"); while (runningTaskQueue_.RemoveException(out var exception)) { @@ -306,7 +310,7 @@ void RecordError(Exception e) ownerPodName_, activitySource_, agentHandler_, - logger_, + taskHandlerLogger, pollsterOptions_, () => taskProcessingDict_.Remove(message.TaskId), cts); diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 86abf09cb..b8c34f185 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -307,18 +307,9 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, AcquisitionDate = DateTime.UtcNow, ReceptionDate = messageHandler_.ReceptionDateTime, }; - try - { - taskData_ = await taskTable_.AcquireTask(taskData_, - CancellationToken.None) - .ConfigureAwait(false); - } - catch (TaskAlreadyInFinalStateException) - { - messageHandler_.Status = QueueMessageStatus.Processed; - logger_.LogDebug("Task already in a final state"); - return false; - } + taskData_ = await taskTable_.AcquireTask(taskData_, + CancellationToken.None) + .ConfigureAwait(false); if (cancellationTokenSource_.IsCancellationRequested) { @@ -402,6 +393,13 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, return false; } + if (taskData_.OwnerPodId == ownerPodId_ && taskData_.Status != TaskStatus.Dispatched) + { + logger_.LogInformation("Task is already managed by this agent; message likely to be duplicated"); + messageHandler_.Status = QueueMessageStatus.Processed; + return false; + } + if (cancellationTokenSource_.IsCancellationRequested) { logger_.LogDebug("Task preconditions ok but execution cancellation requested"); From 09d316acf5db6c727d4dc2febaf011bc44b8726f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Tue, 5 Sep 2023 00:21:49 +0200 Subject: [PATCH 07/24] refactor: add message handler id in scopes --- Common/src/Pollster/Pollster.cs | 8 ++++---- Common/src/Pollster/TaskHandler.cs | 5 +++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index 7e4d837d9..dc4730ef8 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -271,10 +271,10 @@ void RecordError(Exception e) await foreach (var message in messages.ConfigureAwait(false)) { var taskHandlerLogger = loggerFactory_.CreateLogger(); - taskHandlerLogger.BeginNamedScope("Prefetch messageHandler", - ("messageHandler", message.MessageId), - ("taskId", message.TaskId), - ("ownerPodId", ownerPodId_)); + using var _ = taskHandlerLogger.BeginNamedScope("Prefetch messageHandler", + ("messageHandler", message.MessageId), + ("taskId", message.TaskId), + ("ownerPodId", ownerPodId_)); // ReSharper disable once ExplicitCallerInfoArgument using var activity = activitySource_.StartActivity("ProcessQueueMessage"); diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index b8c34f185..5bab3508b 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -123,6 +123,7 @@ public async ValueTask DisposeAsync() { using var _ = logger_.BeginNamedScope("DisposeAsync", ("taskId", messageHandler_.TaskId), + ("messageHandler", messageHandler_.MessageId), ("sessionId", taskData_?.SessionId ?? "")); onDispose_.Invoke(); @@ -170,6 +171,7 @@ public async Task AcquireTask() { using var activity = activitySource_.StartActivity($"{nameof(AcquireTask)}"); using var _ = logger_.BeginNamedScope("Acquiring task", + ("messageHandler", messageHandler_.MessageId), ("taskId", messageHandler_.TaskId)); try @@ -449,6 +451,7 @@ public async Task PreProcessing() } using var _ = logger_.BeginNamedScope("PreProcessing", + ("messageHandler", messageHandler_.MessageId), ("taskId", messageHandler_.TaskId), ("sessionId", taskData_.SessionId)); logger_.LogDebug("Start prefetch data"); @@ -484,6 +487,7 @@ public async Task ExecuteTask() } using var _ = logger_.BeginNamedScope("TaskExecution", + ("messageHandler", messageHandler_.MessageId), ("taskId", messageHandler_.TaskId), ("sessionId", taskData_.SessionId)); @@ -588,6 +592,7 @@ public async Task PostProcessing() } using var _ = logger_.BeginNamedScope("PostProcessing", + ("messageHandler", messageHandler_.MessageId), ("taskId", messageHandler_.TaskId), ("sessionId", taskData_.SessionId)); From 7aab15bd1e20f00c52a3522db8dffa83f271d3a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Wed, 6 Sep 2023 08:28:26 +0200 Subject: [PATCH 08/24] refactor: add an option for timeout before new task acquisition --- Common/src/Injection/Options/Pollster.cs | 6 ++++++ Common/src/Pollster/Pollster.cs | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Common/src/Injection/Options/Pollster.cs b/Common/src/Injection/Options/Pollster.cs index bdead7fb6..8ecbc7d8a 100644 --- a/Common/src/Injection/Options/Pollster.cs +++ b/Common/src/Injection/Options/Pollster.cs @@ -44,4 +44,10 @@ public class Pollster /// Negative values disable the check /// public int MaxErrorAllowed { get; set; } = 5; + + /// + /// Timeout before releasing the current acquired task and acquiring a new one + /// This happens in parallel of the execution of another task + /// + public TimeSpan TimeoutBeforeNextAcquisition { get; set; } = TimeSpan.FromSeconds(10); } diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index dc4730ef8..ad6121ad3 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -332,7 +332,7 @@ await runningTaskQueue_.WriteAsync(taskHandler, cancellationToken) .ConfigureAwait(false); - await runningTaskQueue_.WaitForNextWriteAsync(TimeSpan.FromMinutes(1), + await runningTaskQueue_.WaitForNextWriteAsync(pollsterOptions_.TimeoutBeforeNextAcquisition, cancellationToken) .ConfigureAwait(false); From 6f0115c348ddec838df968b01cfd39ea66b8257e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Wed, 6 Sep 2023 08:29:37 +0200 Subject: [PATCH 09/24] refactor: use RethrowWithStacktrace --- Common/src/Pollster/Pollster.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index ad6121ad3..a03add751 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -19,7 +19,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Net; -using System.Runtime.ExceptionServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -290,9 +289,7 @@ void RecordError(Exception e) if (exception is RpcException rpcException && TaskHandler.IsStatusFatal(rpcException.StatusCode)) { // This exception should stop pollster - Console.WriteLine(exception); - ExceptionDispatchInfo.Capture(exception) - .Throw(); + exception.RethrowWithStacktrace(); } RecordError(exception); From dca869f9e5ab34fca060d8775e3ef8b9999560ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Wed, 6 Sep 2023 08:30:28 +0200 Subject: [PATCH 10/24] refactor: use concurrent dictionnary to store task handlers --- Common/src/Pollster/Pollster.cs | 48 +++++++++++++++++---------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index a03add751..ee105b5c5 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -16,6 +16,7 @@ // along with this program. If not, see . using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Net; @@ -44,28 +45,28 @@ namespace ArmoniK.Core.Common.Pollster; public class Pollster : IInitializable { - private readonly ActivitySource activitySource_; - private readonly IAgentHandler agentHandler_; - private readonly DataPrefetcher dataPrefetcher_; - private readonly IHostApplicationLifetime lifeTime_; - private readonly ILogger logger_; - private readonly ILoggerFactory loggerFactory_; - private readonly int messageBatchSize_; - private readonly IObjectStorage objectStorage_; - private readonly string ownerPodId_; - private readonly string ownerPodName_; - private readonly Injection.Options.Pollster pollsterOptions_; - private readonly IPullQueueStorage pullQueueStorage_; - private readonly IResultTable resultTable_; - private readonly RunningTaskQueue runningTaskQueue_; - private readonly ISessionTable sessionTable_; - private readonly ISubmitter submitter_; - private readonly ITaskProcessingChecker taskProcessingChecker_; - private readonly Dictionary taskProcessingDict_ = new(); - private readonly ITaskTable taskTable_; - private readonly IWorkerStreamHandler workerStreamHandler_; - private bool endLoopReached_; - private HealthCheckResult? healthCheckFailedResult_; + private readonly ActivitySource activitySource_; + private readonly IAgentHandler agentHandler_; + private readonly DataPrefetcher dataPrefetcher_; + private readonly IHostApplicationLifetime lifeTime_; + private readonly ILogger logger_; + private readonly ILoggerFactory loggerFactory_; + private readonly int messageBatchSize_; + private readonly IObjectStorage objectStorage_; + private readonly string ownerPodId_; + private readonly string ownerPodName_; + private readonly Injection.Options.Pollster pollsterOptions_; + private readonly IPullQueueStorage pullQueueStorage_; + private readonly IResultTable resultTable_; + private readonly RunningTaskQueue runningTaskQueue_; + private readonly ISessionTable sessionTable_; + private readonly ISubmitter submitter_; + private readonly ITaskProcessingChecker taskProcessingChecker_; + private readonly ConcurrentDictionary taskProcessingDict_ = new(); + private readonly ITaskTable taskTable_; + private readonly IWorkerStreamHandler workerStreamHandler_; + private bool endLoopReached_; + private HealthCheckResult? healthCheckFailedResult_; public Pollster(IPullQueueStorage pullQueueStorage, @@ -309,7 +310,8 @@ void RecordError(Exception e) agentHandler_, taskHandlerLogger, pollsterOptions_, - () => taskProcessingDict_.Remove(message.TaskId), + () => taskProcessingDict_.TryRemove(message.TaskId, + out var _), cts); taskProcessingDict_.TryAdd(message.TaskId, From 5b5f6283d4ef7b3d3e1345908dfa5de589273bff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Wed, 6 Sep 2023 08:31:26 +0200 Subject: [PATCH 11/24] refactor: if task id is already in dict, task is already processed by this agent --- Common/src/Pollster/Pollster.cs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index ee105b5c5..0d802600f 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -314,8 +314,15 @@ void RecordError(Exception e) out var _), cts); - taskProcessingDict_.TryAdd(message.TaskId, - taskHandler); + if (!taskProcessingDict_.TryAdd(message.TaskId, + taskHandler)) + { + message.Status = QueueMessageStatus.Processed; + await taskHandler.DisposeAsync() + .ConfigureAwait(false); + continue; + } + try { From 98ef1e598abd79ef23c9487c55fe964aea249cae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Wed, 6 Sep 2023 08:32:13 +0200 Subject: [PATCH 12/24] fix: properly dispose task handler when preprocessing throws --- Common/src/Pollster/Pollster.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index 0d802600f..f504da97c 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -331,8 +331,17 @@ await taskHandler.DisposeAsync() if (precondition) { - await taskHandler.PreProcessing() - .ConfigureAwait(false); + try + { + await taskHandler.PreProcessing() + .ConfigureAwait(false); + } + catch (Exception) + { + await taskHandler.DisposeAsync() + .ConfigureAwait(false); + throw; + } await runningTaskQueue_.WriteAsync(taskHandler, cancellationToken) From dc8f42226305d7232f0ec0df51b8baec6ef55897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Sep 2023 11:21:40 +0200 Subject: [PATCH 13/24] refactor: log task data that cannot be processed when there is an error in LogStatsFromSessionAsync --- Tests/Common/Client/src/GrpcChannelExt.cs | 34 +++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/Tests/Common/Client/src/GrpcChannelExt.cs b/Tests/Common/Client/src/GrpcChannelExt.cs index a35ea0396..e9e8511a4 100644 --- a/Tests/Common/Client/src/GrpcChannelExt.cs +++ b/Tests/Common/Client/src/GrpcChannelExt.cs @@ -120,22 +120,32 @@ public static async Task LogStatsFromSessionAsync(this ChannelBase channel, }) .ConfigureAwait(false)) { - if (taskDetailed.Status is TaskStatus.Completed or TaskStatus.Error or TaskStatus.Retried) + try { - var useRatio = (taskDetailed.EndedAt - taskDetailed.StartedAt).ToTimeSpan() - .TotalMilliseconds / (taskDetailed.EndedAt - taskDetailed.ReceivedAt).ToTimeSpan() - .TotalMilliseconds; - - usageRatio.Add(useRatio); + if (taskDetailed.Status is TaskStatus.Completed or TaskStatus.Error or TaskStatus.Retried) + { + var useRatio = (taskDetailed.EndedAt - taskDetailed.StartedAt).ToTimeSpan() + .TotalMilliseconds / (taskDetailed.EndedAt - taskDetailed.ReceivedAt).ToTimeSpan() + .TotalMilliseconds; + + usageRatio.Add(useRatio); + } + + if (taskDetailed.DataDependencies.Count > 0) + { + taskAggregation.Add(taskDetailed); + } + + taskDependencies.Add(taskDetailed.Id, + taskDetailed); } - - if (taskDetailed.DataDependencies.Count > 0) + catch (Exception e) { - taskAggregation.Add(taskDetailed); + logger.LogError(e, + "Cannot process {@task}", + taskDetailed); + throw; } - - taskDependencies.Add(taskDetailed.Id, - taskDetailed); } var timediff = new List(); From dca1b941639c6bcb6860812d3bdb5c4d9402ec59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Sep 2023 19:41:26 +0200 Subject: [PATCH 14/24] fix: proper rethrows in TaskHandler --- Common/src/Pollster/TaskHandler.cs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 5bab3508b..7bac4280c 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -515,17 +515,7 @@ await taskTable_.StartTask(taskData_, workerStreamHandler_.StartTaskProcessing(taskData_, workerConnectionCts_.Token); - } - catch (Exception e) - { - await HandleErrorRequeueAsync(e, - taskData_, - cancellationTokenSource_.Token) - .ConfigureAwait(false); - } - try - { if (workerStreamHandler_.Pipe is null) { throw new ArmoniKException($"{nameof(IWorkerStreamHandler.Pipe)} should not be null"); @@ -543,10 +533,25 @@ await workerStreamHandler_.Pipe.WriteAsync(new ProcessRequest await workerStreamHandler_.Pipe.CompleteAsync() .ConfigureAwait(false); + } + catch (TaskAlreadyInFinalStateException) + { + messageHandler_.Status = QueueMessageStatus.Processed; + throw; + } + catch (Exception e) + { + await HandleErrorRequeueAsync(e, + taskData_, + cancellationTokenSource_.Token) + .ConfigureAwait(false); + } + try + { // at this point worker requests should have ended logger_.LogDebug("Wait for task output"); - reply_ = await workerStreamHandler_.Pipe.ReadAsync(workerConnectionCts_.Token) + reply_ = await workerStreamHandler_.Pipe!.ReadAsync(workerConnectionCts_.Token) .ConfigureAwait(false); logger_.LogDebug("Stop agent server"); From f91d125eb6713dc0d1b5d9489a70ef8713a15b67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Sep 2023 21:03:28 +0200 Subject: [PATCH 15/24] refactor: better logs for task starting --- Adaptors/MongoDB/src/TaskTable.cs | 2 +- Common/src/Pollster/TaskHandler.cs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Adaptors/MongoDB/src/TaskTable.cs b/Adaptors/MongoDB/src/TaskTable.cs index 65c0ba285..ae4a94cdb 100644 --- a/Adaptors/MongoDB/src/TaskTable.cs +++ b/Adaptors/MongoDB/src/TaskTable.cs @@ -144,7 +144,7 @@ public async Task StartTask(TaskData taskData, taskData.StartDate) .Set(tdm => tdm.PodTtl, taskData.PodTtl); - Logger.LogInformation("update task {taskId} to status {status}", + Logger.LogInformation("Trying to start task {taskId} and update to status {status}", taskData.TaskId, TaskStatus.Processing); var res = await taskCollection.UpdateManyAsync(x => x.TaskId == taskData.TaskId && x.Status != TaskStatus.Completed && x.Status != TaskStatus.Cancelled && diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 7bac4280c..15da005f4 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -534,9 +534,11 @@ await workerStreamHandler_.Pipe.WriteAsync(new ProcessRequest await workerStreamHandler_.Pipe.CompleteAsync() .ConfigureAwait(false); } - catch (TaskAlreadyInFinalStateException) + catch (TaskAlreadyInFinalStateException e) { messageHandler_.Status = QueueMessageStatus.Processed; + logger_.LogWarning(e, + "Task already in a final state, removing it from the queue"); throw; } catch (Exception e) From 286925cc0972661e9add2affe4b7e8c7029e1c93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Sep 2023 21:24:39 +0200 Subject: [PATCH 16/24] refactor: retry check on other agent when connection is refused --- .../TaskProcessingCheckerClient.cs | 67 ++++++++++++------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs index 180274e2e..9f4bb0dfd 100644 --- a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs +++ b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs @@ -18,6 +18,7 @@ using System; using System.Linq; using System.Net.Http; +using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; @@ -44,33 +45,47 @@ public async Task Check(string taskId, logger_.LogTrace("Check if task is processing"); var client = httpClientFactory_.CreateClient(); - try + for (var i = 0; i < 5; i++) { - var result = await client.GetStringAsync("http://" + ownerPodId + ":1080/taskprocessing", - cancellationToken) - .ConfigureAwait(false); - logger_.LogDebug("Result from other polling agent: {result}", - result); - return result.Split(",") - .Contains(taskId); - } - catch (InvalidOperationException ex) - { - logger_.LogWarning(ex, - "Cannot communicate with other pod"); - return false; - } - catch (HttpRequestException ex) - { - logger_.LogWarning(ex, - "Cannot communicate with other pod"); - return false; - } - catch (UriFormatException ex) - { - logger_.LogWarning(ex, - "Invalid other pod hostname"); - return false; + try + { + var result = await client.GetStringAsync("http://" + ownerPodId + ":1080/taskprocessing", + cancellationToken) + .ConfigureAwait(false); + logger_.LogDebug("Result from other polling agent: {result}", + result); + return result.Split(",") + .Contains(taskId); + } + catch (InvalidOperationException ex) + { + logger_.LogWarning(ex, + "Cannot communicate with other pod"); + return false; + } + catch (HttpRequestException ex) when (ex.InnerException is SocketException + { + SocketErrorCode: SocketError.ConnectionRefused, + }) + { + logger_.LogWarning(ex, + "Cannot communicate with other pod"); + } + catch (HttpRequestException ex) + { + logger_.LogWarning(ex, + "Cannot communicate with other pod"); + return false; + } + catch (UriFormatException ex) + { + logger_.LogWarning(ex, + "Invalid other pod hostname"); + return false; + } } + + logger_.LogWarning("Too many tries to communicate with other pod"); + return false; } } From 9dba90b121f20b66f4a638f22eac54a18401a831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 7 Sep 2023 23:24:37 +0200 Subject: [PATCH 17/24] fix: postpone task acquisition when other agent is unavailable if acquisition timeout is not exceeded --- Common/src/Pollster/TaskHandler.cs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 15da005f4..35814cd7b 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -47,6 +47,7 @@ public sealed class TaskHandler : IAsyncDisposable private readonly IAgentHandler agentHandler_; private readonly CancellationTokenSource cancellationTokenSource_; private readonly DataPrefetcher dataPrefetcher_; + private readonly TimeSpan delayBeforeAcquisition_; private readonly ILogger logger_; private readonly IQueueMessageHandler messageHandler_; private readonly Action onDispose_; @@ -102,6 +103,7 @@ public TaskHandler(ISessionTable sessionTable, sessionData_ = null; token_ = Guid.NewGuid() .ToString(); + delayBeforeAcquisition_ = pollsterOptions.TimeoutBeforeNextAcquisition + TimeSpan.FromSeconds(2); workerConnectionCts_ = new CancellationTokenSource(); cancellationTokenSource_ = new CancellationTokenSource(); @@ -352,6 +354,15 @@ await taskTable_.ReleaseTask(taskData_, .ConfigureAwait(false); logger_.LogInformation("Task is not running on the other polling agent, status : {status}", taskData_.Status); + + if (taskData_.Status is TaskStatus.Dispatched && taskData_.AcquisitionDate < DateTime.UtcNow + delayBeforeAcquisition_) + + { + messageHandler_.Status = QueueMessageStatus.Postponed; + logger_.LogDebug("Wait to exceed acquisition timeout before resubmitting task"); + return false; + } + if (taskData_.Status is TaskStatus.Processing or TaskStatus.Dispatched or TaskStatus.Processed) { logger_.LogDebug("Resubmitting task {task} on another pod", @@ -369,6 +380,7 @@ await submitter_.CompleteTaskAsync(taskData_, .ConfigureAwait(false); } + if (taskData_.Status is TaskStatus.Cancelling) { messageHandler_.Status = QueueMessageStatus.Cancelled; From dc8bd20af3a9a508abbbe9eb0d15b4dfb3f23b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Sun, 10 Sep 2023 20:28:11 +0200 Subject: [PATCH 18/24] refactor: remove unneeded (Exception) in catch --- Common/src/Pollster/Pollster.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index f504da97c..5d24d0eaf 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -336,7 +336,7 @@ await taskHandler.DisposeAsync() await taskHandler.PreProcessing() .ConfigureAwait(false); } - catch (Exception) + catch { await taskHandler.DisposeAsync() .ConfigureAwait(false); From 85b97cb0523f9fe8a45dffd180efeebcff657e55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Sun, 10 Sep 2023 20:35:16 +0200 Subject: [PATCH 19/24] refactor: make StopCancelledTask a function --- Common/src/Pollster/Pollster.cs | 19 +++++++++---------- Common/tests/Pollster/PollsterTest.cs | 2 +- Compute/PollingAgent/src/Program.cs | 10 +++------- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index 5d24d0eaf..51f36961f 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -115,16 +115,6 @@ public Pollster(IPullQueueStorage pullQueueStorage, Failed = false; } - public Func StopCancelledTask - => async () => - { - foreach (var taskHandler in taskProcessingDict_) - { - await taskHandler.Value.StopCancelledTask() - .ConfigureAwait(false); - } - }; - public ICollection TaskProcessing => taskProcessingDict_.Keys; @@ -213,6 +203,15 @@ public async Task Check(HealthCheckTag tag) return result; } + public async Task StopCancelledTask() + { + foreach (var taskHandler in taskProcessingDict_.Values) + { + await taskHandler.StopCancelledTask() + .ConfigureAwait(false); + } + } + public async Task MainLoop(CancellationToken cancellationToken) { await Init(cancellationToken) diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index ffa0216fa..bc4b62a63 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -560,7 +560,7 @@ await Task.Delay(TimeSpan.FromMilliseconds(200), CancellationToken.None) .ConfigureAwait(false); - await testServiceProvider.Pollster.StopCancelledTask.Invoke() + await testServiceProvider.Pollster.StopCancelledTask() .ConfigureAwait(false); Assert.DoesNotThrowAsync(() => mainLoopTask); diff --git a/Compute/PollingAgent/src/Program.cs b/Compute/PollingAgent/src/Program.cs index a329ab9ea..b25657403 100644 --- a/Compute/PollingAgent/src/Program.cs +++ b/Compute/PollingAgent/src/Program.cs @@ -191,13 +191,9 @@ public static async Task Main(string[] args) .TaskProcessing))); endpoints.MapGet("/stopcancelledtask", - async () => - { - var stopCancelledTask = app.Services.GetRequiredService() - .StopCancelledTask; - await stopCancelledTask.Invoke() - .ConfigureAwait(false); - }); + () => app.Services.GetRequiredService() + .StopCancelledTask() + .ConfigureAwait(false)); }); var pushQueueStorage = app.Services.GetRequiredService(); From ec055d76a0a5dedafdbcbb6a6b4484783093f05d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Sun, 10 Sep 2023 20:39:02 +0200 Subject: [PATCH 20/24] refactor: use CancellationTokenSource.CreateLinkedTokenSource instead of Register --- Common/src/Pollster/RunningTaskQueue.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Common/src/Pollster/RunningTaskQueue.cs b/Common/src/Pollster/RunningTaskQueue.cs index 58c0a83fc..3c86af9ed 100644 --- a/Common/src/Pollster/RunningTaskQueue.cs +++ b/Common/src/Pollster/RunningTaskQueue.cs @@ -45,8 +45,8 @@ public async Task WriteAsync(TaskHandler handler, public async Task WaitForNextWriteAsync(TimeSpan timeout, CancellationToken cancellationToken) { - var cts = new CancellationTokenSource(timeout); - await using var registration = cancellationToken.Register(() => cts.Cancel()); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout); await channel_.Writer.WaitToWriteAsync(cts.Token) .ConfigureAwait(false); From 274fd5d59f2a595d09d90ab9fe5120bd0311ac35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Sun, 10 Sep 2023 20:45:40 +0200 Subject: [PATCH 21/24] refactor: simpler use of async --- Common/tests/Pollster/TaskHandlerTest.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Common/tests/Pollster/TaskHandlerTest.cs b/Common/tests/Pollster/TaskHandlerTest.cs index 338378612..f04f59af8 100644 --- a/Common/tests/Pollster/TaskHandlerTest.cs +++ b/Common/tests/Pollster/TaskHandlerTest.cs @@ -1500,8 +1500,7 @@ public async Task CancelLongTaskShouldSucceed() await testServiceProvider.TaskHandler.PreProcessing() .ConfigureAwait(false); - var exec = testServiceProvider.TaskHandler.ExecuteTask() - .ConfigureAwait(false); + var exec = testServiceProvider.TaskHandler.ExecuteTask(); // Cancel task for test @@ -1528,7 +1527,7 @@ await testServiceProvider.TaskHandler.StopCancelledTask() await testServiceProvider.TaskHandler.StopCancelledTask() .ConfigureAwait(false); - Assert.ThrowsAsync(async () => await exec); + Assert.ThrowsAsync(() => exec); Assert.AreEqual(TaskStatus.Cancelling, (await testServiceProvider.TaskTable.GetTaskStatus(new[] From 4287ae095b7b908c6e2a0fdab18f524c911a66fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 11 Sep 2023 07:00:54 +0200 Subject: [PATCH 22/24] fix: return task instead of ConfiguredTaskAwaitable --- Compute/PollingAgent/src/Program.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Compute/PollingAgent/src/Program.cs b/Compute/PollingAgent/src/Program.cs index b25657403..de05597e6 100644 --- a/Compute/PollingAgent/src/Program.cs +++ b/Compute/PollingAgent/src/Program.cs @@ -192,8 +192,7 @@ public static async Task Main(string[] args) endpoints.MapGet("/stopcancelledtask", () => app.Services.GetRequiredService() - .StopCancelledTask() - .ConfigureAwait(false)); + .StopCancelledTask()); }); var pushQueueStorage = app.Services.GetRequiredService(); From d99eb9c79b80ba57579a8e2e72f8566f41e190ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Tue, 12 Sep 2023 19:04:13 +0200 Subject: [PATCH 23/24] refactor: create TaskQueueBase to factorize code --- .../src/Pollster/PostProcessingTaskQueue.cs | 42 +--------- Common/src/Pollster/RunningTaskQueue.cs | 58 +------------ Common/src/Pollster/TaskQueueBase.cs | 82 +++++++++++++++++++ 3 files changed, 88 insertions(+), 94 deletions(-) create mode 100644 Common/src/Pollster/TaskQueueBase.cs diff --git a/Common/src/Pollster/PostProcessingTaskQueue.cs b/Common/src/Pollster/PostProcessingTaskQueue.cs index e6d77cc51..ead604a65 100644 --- a/Common/src/Pollster/PostProcessingTaskQueue.cs +++ b/Common/src/Pollster/PostProcessingTaskQueue.cs @@ -15,48 +15,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; - namespace ArmoniK.Core.Common.Pollster; -public class PostProcessingTaskQueue +public sealed class PostProcessingTaskQueue : TaskQueueBase { - private readonly Channel channel_ = Channel.CreateBounded(new BoundedChannelOptions(1) - { - Capacity = 1, - FullMode = BoundedChannelFullMode.Wait, - SingleReader = true, - SingleWriter = true, - }); - - private readonly Queue exceptions_ = new(); - - public async Task WriteAsync(TaskHandler handler, - CancellationToken cancellationToken) - => await channel_.Writer.WriteAsync(handler, - cancellationToken) - .ConfigureAwait(false); - - public async Task ReadAsync(CancellationToken cancellationToken) - => await channel_.Reader.ReadAsync(cancellationToken) - .ConfigureAwait(false); - - public void AddException(Exception e) - => exceptions_.Enqueue(e); - - public bool RemoveException([MaybeNullWhen(false)] out Exception e) + public PostProcessingTaskQueue() + : base(true) { - var r = exceptions_.Count > 0; - - e = r - ? exceptions_.Dequeue() - : null; - - return r; } } diff --git a/Common/src/Pollster/RunningTaskQueue.cs b/Common/src/Pollster/RunningTaskQueue.cs index 3c86af9ed..995721330 100644 --- a/Common/src/Pollster/RunningTaskQueue.cs +++ b/Common/src/Pollster/RunningTaskQueue.cs @@ -15,64 +15,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; - namespace ArmoniK.Core.Common.Pollster; -public class RunningTaskQueue +public sealed class RunningTaskQueue : TaskQueueBase { - private readonly Channel channel_ = Channel.CreateBounded(new BoundedChannelOptions(1) - { - Capacity = 1, - FullMode = BoundedChannelFullMode.Wait, - SingleReader = false, - SingleWriter = true, - }); - - private readonly Queue exceptions_ = new(); - - public async Task WriteAsync(TaskHandler handler, - CancellationToken cancellationToken) - => await channel_.Writer.WriteAsync(handler, - cancellationToken) - .ConfigureAwait(false); - - public async Task WaitForNextWriteAsync(TimeSpan timeout, - CancellationToken cancellationToken) + public RunningTaskQueue() + : base(false) { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(timeout); - - await channel_.Writer.WaitToWriteAsync(cts.Token) - .ConfigureAwait(false); - - if (channel_.Reader.TryRead(out var handler)) - { - await handler.DisposeAsync() - .ConfigureAwait(false); - } - } - - public async Task ReadAsync(CancellationToken cancellationToken) - => await channel_.Reader.ReadAsync(cancellationToken) - .ConfigureAwait(false); - - public void AddException(Exception e) - => exceptions_.Enqueue(e); - - public bool RemoveException([MaybeNullWhen(false)] out Exception e) - { - var r = exceptions_.Count > 0; - - e = r - ? exceptions_.Dequeue() - : null; - - return r; } } diff --git a/Common/src/Pollster/TaskQueueBase.cs b/Common/src/Pollster/TaskQueueBase.cs new file mode 100644 index 000000000..03525b831 --- /dev/null +++ b/Common/src/Pollster/TaskQueueBase.cs @@ -0,0 +1,82 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace ArmoniK.Core.Common.Pollster; + +public abstract class TaskQueueBase +{ + private readonly Channel channel_; + + + private readonly Queue exceptions_ = new(); + + public TaskQueueBase(bool singleReader) + => channel_ = Channel.CreateBounded(new BoundedChannelOptions(1) + { + Capacity = 1, + FullMode = BoundedChannelFullMode.Wait, + SingleReader = singleReader, + SingleWriter = true, + }); + + public async Task WriteAsync(TaskHandler handler, + CancellationToken cancellationToken) + => await channel_.Writer.WriteAsync(handler, + cancellationToken) + .ConfigureAwait(false); + + public async Task WaitForNextWriteAsync(TimeSpan timeout, + CancellationToken cancellationToken) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout); + + await channel_.Writer.WaitToWriteAsync(cts.Token) + .ConfigureAwait(false); + + if (channel_.Reader.TryRead(out var handler)) + { + await handler.DisposeAsync() + .ConfigureAwait(false); + } + } + + public async Task ReadAsync(CancellationToken cancellationToken) + => await channel_.Reader.ReadAsync(cancellationToken) + .ConfigureAwait(false); + + public void AddException(Exception e) + => exceptions_.Enqueue(e); + + public bool RemoveException([MaybeNullWhen(false)] out Exception e) + { + var r = exceptions_.Count > 0; + + e = r + ? exceptions_.Dequeue() + : null; + + return r; + } +} From 3f3a3990fbad6c1f8dd9ac95e16650e6f1b11dbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Wed, 13 Sep 2023 10:57:49 +0200 Subject: [PATCH 24/24] refactor: use const for number of retries --- .../TaskProcessingChecker/TaskProcessingCheckerClient.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs index 9f4bb0dfd..a0834723b 100644 --- a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs +++ b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs @@ -28,6 +28,7 @@ namespace ArmoniK.Core.Common.Pollster.TaskProcessingChecker; public class TaskProcessingCheckerClient : ITaskProcessingChecker { + private const int Retries = 5; private readonly IHttpClientFactory httpClientFactory_; private readonly ILogger logger_; @@ -45,7 +46,7 @@ public async Task Check(string taskId, logger_.LogTrace("Check if task is processing"); var client = httpClientFactory_.CreateClient(); - for (var i = 0; i < 5; i++) + for (var i = 0; i < Retries; i++) { try {