diff --git a/Adaptors/Memory/src/TaskTable.cs b/Adaptors/Memory/src/TaskTable.cs
index e3f1facef..7cc8bad27 100644
--- a/Adaptors/Memory/src/TaskTable.cs
+++ b/Adaptors/Memory/src/TaskTable.cs
@@ -110,12 +110,20 @@ public Task StartTask(TaskData taskData,
taskId2TaskData_.AddOrUpdate(taskData.TaskId,
_ => throw new TaskNotFoundException($"Key '{taskData.TaskId}' not found"),
(_,
- data) => data with
- {
- Status = TaskStatus.Processing,
- StartDate = taskData.StartDate,
- PodTtl = taskData.PodTtl,
- });
+ data) =>
+ {
+ if (data.Status is TaskStatus.Error or TaskStatus.Completed or TaskStatus.Retried or TaskStatus.Cancelled)
+ {
+ throw new TaskAlreadyInFinalStateException($"{taskData.TaskId} is already in a final state : {data.Status}");
+ }
+
+ return data with
+ {
+ Status = TaskStatus.Processing,
+ StartDate = taskData.StartDate,
+ PodTtl = taskData.PodTtl,
+ };
+ });
return Task.CompletedTask;
}
diff --git a/Adaptors/MongoDB/src/TaskTable.cs b/Adaptors/MongoDB/src/TaskTable.cs
index 8de871b35..ae4a94cdb 100644
--- a/Adaptors/MongoDB/src/TaskTable.cs
+++ b/Adaptors/MongoDB/src/TaskTable.cs
@@ -32,6 +32,7 @@
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.Storage;
+using ArmoniK.Utils;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
@@ -143,10 +144,11 @@ public async Task StartTask(TaskData taskData,
taskData.StartDate)
.Set(tdm => tdm.PodTtl,
taskData.PodTtl);
- Logger.LogInformation("update task {taskId} to status {status}",
+ Logger.LogInformation("Trying to start task {taskId} and update to status {status}",
taskData.TaskId,
TaskStatus.Processing);
- var res = await taskCollection.UpdateManyAsync(x => x.TaskId == taskData.TaskId && x.Status != TaskStatus.Completed && x.Status != TaskStatus.Cancelled,
+ var res = await taskCollection.UpdateManyAsync(x => x.TaskId == taskData.TaskId && x.Status != TaskStatus.Completed && x.Status != TaskStatus.Cancelled &&
+ x.Status != TaskStatus.Error && x.Status != TaskStatus.Retried,
updateDefinition,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
@@ -154,19 +156,19 @@ public async Task StartTask(TaskData taskData,
switch (res.MatchedCount)
{
case 0:
- var taskStatus = await GetTaskStatus(new[]
- {
- taskData.TaskId,
- },
- cancellationToken)
- .ConfigureAwait(false);
+ var taskStatus = (await GetTaskStatus(new[]
+ {
+ taskData.TaskId,
+ },
+ cancellationToken)
+ .ConfigureAwait(false)).AsICollection();
if (!taskStatus.Any())
{
throw new TaskNotFoundException($"Task {taskData.TaskId} not found");
}
- throw new ArmoniKException($"Task already in a terminal state - {taskStatus.Single()} to {TaskStatus.Processing}");
+ throw new TaskAlreadyInFinalStateException($"Task already in a terminal state - {taskStatus.Single()} to {TaskStatus.Processing}");
case > 1:
throw new ArmoniKException("Multiple tasks modified");
}
diff --git a/Common/src/Exceptions/TaskAlreadyInFinalStateException.cs b/Common/src/Exceptions/TaskAlreadyInFinalStateException.cs
new file mode 100644
index 000000000..8c54e70fb
--- /dev/null
+++ b/Common/src/Exceptions/TaskAlreadyInFinalStateException.cs
@@ -0,0 +1,40 @@
+// This file is part of the ArmoniK project
+//
+// Copyright (C) ANEO, 2021-2023. All rights reserved.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY, without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+using System;
+
+namespace ArmoniK.Core.Common.Exceptions;
+
+[Serializable]
+public class TaskAlreadyInFinalStateException : ArmoniKException
+{
+ public TaskAlreadyInFinalStateException()
+ {
+ }
+
+ public TaskAlreadyInFinalStateException(string message)
+ : base(message)
+ {
+ }
+
+ public TaskAlreadyInFinalStateException(string message,
+ Exception innerException)
+ : base(message,
+ innerException)
+ {
+ }
+}
diff --git a/Common/src/Injection/Options/Pollster.cs b/Common/src/Injection/Options/Pollster.cs
index bdead7fb6..8ecbc7d8a 100644
--- a/Common/src/Injection/Options/Pollster.cs
+++ b/Common/src/Injection/Options/Pollster.cs
@@ -44,4 +44,10 @@ public class Pollster
/// Negative values disable the check
///
public int MaxErrorAllowed { get; set; } = 5;
+
+ ///
+ /// Timeout before releasing the current acquired task and acquiring a new one
+ /// This happens in parallel of the execution of another task
+ ///
+ public TimeSpan TimeoutBeforeNextAcquisition { get; set; } = TimeSpan.FromSeconds(10);
}
diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs
index 9a600fff6..51f36961f 100644
--- a/Common/src/Pollster/Pollster.cs
+++ b/Common/src/Pollster/Pollster.cs
@@ -16,6 +16,7 @@
// along with this program. If not, see .
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
@@ -44,27 +45,29 @@ namespace ArmoniK.Core.Common.Pollster;
public class Pollster : IInitializable
{
- private readonly ActivitySource activitySource_;
- private readonly IAgentHandler agentHandler_;
- private readonly DataPrefetcher dataPrefetcher_;
- private readonly IHostApplicationLifetime lifeTime_;
- private readonly ILogger logger_;
- private readonly int messageBatchSize_;
- private readonly IObjectStorage objectStorage_;
- private readonly string ownerPodId_;
- private readonly string ownerPodName_;
- private readonly Injection.Options.Pollster pollsterOptions_;
- private readonly IPullQueueStorage pullQueueStorage_;
- private readonly IResultTable resultTable_;
- private readonly ISessionTable sessionTable_;
- private readonly ISubmitter submitter_;
- private readonly ITaskProcessingChecker taskProcessingChecker_;
- private readonly ITaskTable taskTable_;
- private readonly IWorkerStreamHandler workerStreamHandler_;
- private bool endLoopReached_;
- private HealthCheckResult? healthCheckFailedResult_;
- public Func? StopCancelledTask;
- public string TaskProcessing;
+ private readonly ActivitySource activitySource_;
+ private readonly IAgentHandler agentHandler_;
+ private readonly DataPrefetcher dataPrefetcher_;
+ private readonly IHostApplicationLifetime lifeTime_;
+ private readonly ILogger logger_;
+ private readonly ILoggerFactory loggerFactory_;
+ private readonly int messageBatchSize_;
+ private readonly IObjectStorage objectStorage_;
+ private readonly string ownerPodId_;
+ private readonly string ownerPodName_;
+ private readonly Injection.Options.Pollster pollsterOptions_;
+ private readonly IPullQueueStorage pullQueueStorage_;
+ private readonly IResultTable resultTable_;
+ private readonly RunningTaskQueue runningTaskQueue_;
+ private readonly ISessionTable sessionTable_;
+ private readonly ISubmitter submitter_;
+ private readonly ITaskProcessingChecker taskProcessingChecker_;
+ private readonly ConcurrentDictionary taskProcessingDict_ = new();
+ private readonly ITaskTable taskTable_;
+ private readonly IWorkerStreamHandler workerStreamHandler_;
+ private bool endLoopReached_;
+ private HealthCheckResult? healthCheckFailedResult_;
+
public Pollster(IPullQueueStorage pullQueueStorage,
DataPrefetcher dataPrefetcher,
@@ -73,6 +76,7 @@ public Pollster(IPullQueueStorage pullQueueStorage,
IHostApplicationLifetime lifeTime,
ActivitySource activitySource,
ILogger logger,
+ ILoggerFactory loggerFactory,
IObjectStorage objectStorage,
IResultTable resultTable,
ISubmitter submitter,
@@ -80,7 +84,8 @@ public Pollster(IPullQueueStorage pullQueueStorage,
ITaskTable taskTable,
ITaskProcessingChecker taskProcessingChecker,
IWorkerStreamHandler workerStreamHandler,
- IAgentHandler agentHandler)
+ IAgentHandler agentHandler,
+ RunningTaskQueue runningTaskQueue)
{
if (options.MessageBatchSize < 1)
{
@@ -89,6 +94,7 @@ public Pollster(IPullQueueStorage pullQueueStorage,
}
logger_ = logger;
+ loggerFactory_ = loggerFactory;
activitySource_ = activitySource;
pullQueueStorage_ = pullQueueStorage;
lifeTime_ = lifeTime;
@@ -103,12 +109,15 @@ public Pollster(IPullQueueStorage pullQueueStorage,
taskProcessingChecker_ = taskProcessingChecker;
workerStreamHandler_ = workerStreamHandler;
agentHandler_ = agentHandler;
- TaskProcessing = "";
+ runningTaskQueue_ = runningTaskQueue;
ownerPodId_ = LocalIpFinder.LocalIpv4Address();
ownerPodName_ = Dns.GetHostName();
Failed = false;
}
+ public ICollection TaskProcessing
+ => taskProcessingDict_.Keys;
+
///
/// Is true when the MainLoop exited with an error
/// Used in Unit tests
@@ -194,6 +203,15 @@ public async Task Check(HealthCheckTag tag)
return result;
}
+ public async Task StopCancelledTask()
+ {
+ foreach (var taskHandler in taskProcessingDict_.Values)
+ {
+ await taskHandler.StopCancelledTask()
+ .ConfigureAwait(false);
+ }
+ }
+
public async Task MainLoop(CancellationToken cancellationToken)
{
await Init(cancellationToken)
@@ -251,11 +269,12 @@ void RecordError(Exception e)
await foreach (var message in messages.ConfigureAwait(false))
{
- using var scopedLogger = logger_.BeginNamedScope("Prefetch messageHandler",
- ("messageHandler", message.MessageId),
- ("taskId", message.TaskId),
- ("ownerPodId", ownerPodId_));
- TaskProcessing = message.TaskId;
+ var taskHandlerLogger = loggerFactory_.CreateLogger();
+ using var _ = taskHandlerLogger.BeginNamedScope("Prefetch messageHandler",
+ ("messageHandler", message.MessageId),
+ ("taskId", message.TaskId),
+ ("ownerPodId", ownerPodId_));
+
// ReSharper disable once ExplicitCallerInfoArgument
using var activity = activitySource_.StartActivity("ProcessQueueMessage");
activity?.SetBaggage("TaskId",
@@ -263,43 +282,73 @@ void RecordError(Exception e)
activity?.SetBaggage("messageId",
message.MessageId);
- logger_.LogDebug("Start a new Task to process the messageHandler");
+ taskHandlerLogger.LogDebug("Start a new Task to process the messageHandler");
- try
+ while (runningTaskQueue_.RemoveException(out var exception))
+ {
+ if (exception is RpcException rpcException && TaskHandler.IsStatusFatal(rpcException.StatusCode))
+ {
+ // This exception should stop pollster
+ exception.RethrowWithStacktrace();
+ }
+
+ RecordError(exception);
+ }
+
+ var taskHandler = new TaskHandler(sessionTable_,
+ taskTable_,
+ resultTable_,
+ submitter_,
+ dataPrefetcher_,
+ workerStreamHandler_,
+ message,
+ taskProcessingChecker_,
+ ownerPodId_,
+ ownerPodName_,
+ activitySource_,
+ agentHandler_,
+ taskHandlerLogger,
+ pollsterOptions_,
+ () => taskProcessingDict_.TryRemove(message.TaskId,
+ out var _),
+ cts);
+
+ if (!taskProcessingDict_.TryAdd(message.TaskId,
+ taskHandler))
{
- await using var taskHandler = new TaskHandler(sessionTable_,
- taskTable_,
- resultTable_,
- submitter_,
- dataPrefetcher_,
- workerStreamHandler_,
- message,
- taskProcessingChecker_,
- ownerPodId_,
- ownerPodName_,
- activitySource_,
- agentHandler_,
- logger_,
- pollsterOptions_,
- cts);
-
- StopCancelledTask = taskHandler.StopCancelledTask;
+ message.Status = QueueMessageStatus.Processed;
+ await taskHandler.DisposeAsync()
+ .ConfigureAwait(false);
+ continue;
+ }
+
+ try
+ {
var precondition = await taskHandler.AcquireTask()
.ConfigureAwait(false);
if (precondition)
{
- await taskHandler.PreProcessing()
- .ConfigureAwait(false);
-
- await taskHandler.ExecuteTask()
- .ConfigureAwait(false);
+ try
+ {
+ await taskHandler.PreProcessing()
+ .ConfigureAwait(false);
+ }
+ catch
+ {
+ await taskHandler.DisposeAsync()
+ .ConfigureAwait(false);
+ throw;
+ }
- await taskHandler.PostProcessing()
- .ConfigureAwait(false);
+ await runningTaskQueue_.WriteAsync(taskHandler,
+ cancellationToken)
+ .ConfigureAwait(false);
- StopCancelledTask = null;
+ await runningTaskQueue_.WaitForNextWriteAsync(pollsterOptions_.TimeoutBeforeNextAcquisition,
+ cancellationToken)
+ .ConfigureAwait(false);
// If the task was successful, we can remove a failure
if (recordedErrors.Count > 0)
@@ -307,6 +356,11 @@ await taskHandler.PostProcessing()
recordedErrors.Dequeue();
}
}
+ else
+ {
+ await taskHandler.DisposeAsync()
+ .ConfigureAwait(false);
+ }
}
catch (RpcException e) when (TaskHandler.IsStatusFatal(e.StatusCode))
{
@@ -317,11 +371,6 @@ await taskHandler.PostProcessing()
{
RecordError(e);
}
- finally
- {
- StopCancelledTask = null;
- TaskProcessing = string.Empty;
- }
}
}
catch (RpcException e) when (e.StatusCode == StatusCode.Unavailable)
diff --git a/Common/src/Pollster/PostProcessingTaskQueue.cs b/Common/src/Pollster/PostProcessingTaskQueue.cs
new file mode 100644
index 000000000..ead604a65
--- /dev/null
+++ b/Common/src/Pollster/PostProcessingTaskQueue.cs
@@ -0,0 +1,26 @@
+// This file is part of the ArmoniK project
+//
+// Copyright (C) ANEO, 2021-2023. All rights reserved.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY, without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+namespace ArmoniK.Core.Common.Pollster;
+
+public sealed class PostProcessingTaskQueue : TaskQueueBase
+{
+ public PostProcessingTaskQueue()
+ : base(true)
+ {
+ }
+}
diff --git a/Common/src/Pollster/PostProcessor.cs b/Common/src/Pollster/PostProcessor.cs
new file mode 100644
index 000000000..b207d0939
--- /dev/null
+++ b/Common/src/Pollster/PostProcessor.cs
@@ -0,0 +1,60 @@
+// This file is part of the ArmoniK project
+//
+// Copyright (C) ANEO, 2021-2023. All rights reserved.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY, without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+using System;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+using Microsoft.Extensions.Hosting;
+
+namespace ArmoniK.Core.Common.Pollster;
+
+public class PostProcessor : BackgroundService
+{
+ private readonly PostProcessingTaskQueue postProcessingTaskQueue_;
+ public string CurrentTask = string.Empty;
+
+ public PostProcessor(PostProcessingTaskQueue postProcessingTaskQueue)
+ => postProcessingTaskQueue_ = postProcessingTaskQueue;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ var taskHandler = await postProcessingTaskQueue_.ReadAsync(stoppingToken)
+ .ConfigureAwait(false);
+ try
+ {
+ CurrentTask = taskHandler.GetAcquiredTask();
+ await taskHandler.PostProcessing()
+ .ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ postProcessingTaskQueue_.AddException(ExceptionDispatchInfo.Capture(e)
+ .SourceException);
+ }
+ finally
+ {
+ await taskHandler.DisposeAsync()
+ .ConfigureAwait(false);
+ CurrentTask = string.Empty;
+ }
+ }
+ }
+}
diff --git a/Common/src/Pollster/RunningTaskProcessor.cs b/Common/src/Pollster/RunningTaskProcessor.cs
new file mode 100644
index 000000000..c8ca42e9a
--- /dev/null
+++ b/Common/src/Pollster/RunningTaskProcessor.cs
@@ -0,0 +1,85 @@
+// This file is part of the ArmoniK project
+//
+// Copyright (C) ANEO, 2021-2023. All rights reserved.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY, without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+using System;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace ArmoniK.Core.Common.Pollster;
+
+public class RunningTaskProcessor : BackgroundService
+{
+ private readonly ILogger logger_;
+ private readonly PostProcessingTaskQueue postProcessingTaskQueue_;
+ private readonly RunningTaskQueue runningTaskQueue_;
+ public string CurrentTask = string.Empty;
+
+ public RunningTaskProcessor(RunningTaskQueue runningTaskQueue,
+ PostProcessingTaskQueue postProcessingTaskQueue,
+ ILogger logger)
+ {
+ runningTaskQueue_ = runningTaskQueue;
+ postProcessingTaskQueue_ = postProcessingTaskQueue;
+ logger_ = logger;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger_.LogDebug("Start running task processing service");
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ while (postProcessingTaskQueue_.RemoveException(out var exception))
+ {
+ runningTaskQueue_.AddException(exception);
+ }
+
+ var taskHandler = await runningTaskQueue_.ReadAsync(stoppingToken)
+ .ConfigureAwait(false);
+ try
+ {
+ CurrentTask = taskHandler.GetAcquiredTask();
+ await taskHandler.ExecuteTask()
+ .ConfigureAwait(false);
+ await postProcessingTaskQueue_.WriteAsync(taskHandler,
+ stoppingToken)
+ .ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ await taskHandler.DisposeAsync()
+ .ConfigureAwait(false);
+ throw;
+ }
+ }
+ catch (Exception e)
+ {
+ runningTaskQueue_.AddException(ExceptionDispatchInfo.Capture(e)
+ .SourceException);
+ }
+ finally
+ {
+ CurrentTask = string.Empty;
+ }
+ }
+ }
+}
diff --git a/Common/src/Pollster/RunningTaskQueue.cs b/Common/src/Pollster/RunningTaskQueue.cs
new file mode 100644
index 000000000..995721330
--- /dev/null
+++ b/Common/src/Pollster/RunningTaskQueue.cs
@@ -0,0 +1,26 @@
+// This file is part of the ArmoniK project
+//
+// Copyright (C) ANEO, 2021-2023. All rights reserved.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY, without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+namespace ArmoniK.Core.Common.Pollster;
+
+public sealed class RunningTaskQueue : TaskQueueBase
+{
+ public RunningTaskQueue()
+ : base(false)
+ {
+ }
+}
diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs
index 6d58fc8e7..35814cd7b 100644
--- a/Common/src/Pollster/TaskHandler.cs
+++ b/Common/src/Pollster/TaskHandler.cs
@@ -47,8 +47,10 @@ public sealed class TaskHandler : IAsyncDisposable
private readonly IAgentHandler agentHandler_;
private readonly CancellationTokenSource cancellationTokenSource_;
private readonly DataPrefetcher dataPrefetcher_;
+ private readonly TimeSpan delayBeforeAcquisition_;
private readonly ILogger logger_;
private readonly IQueueMessageHandler messageHandler_;
+ private readonly Action onDispose_;
private readonly string ownerPodId_;
private readonly string ownerPodName_;
private readonly CancellationTokenRegistration reg1_;
@@ -62,6 +64,7 @@ public sealed class TaskHandler : IAsyncDisposable
private readonly IWorkerStreamHandler workerStreamHandler_;
private IAgent? agent_;
private Queue? computeRequestStream_;
+ private ProcessReply? reply_;
private SessionData? sessionData_;
private TaskData? taskData_;
@@ -79,6 +82,7 @@ public TaskHandler(ISessionTable sessionTable,
IAgentHandler agentHandler,
ILogger logger,
Injection.Options.Pollster pollsterOptions,
+ Action onDispose,
CancellationTokenSource cancellationTokenSource)
{
sessionTable_ = sessionTable;
@@ -92,12 +96,14 @@ public TaskHandler(ISessionTable sessionTable,
activitySource_ = activitySource;
agentHandler_ = agentHandler;
logger_ = logger;
+ onDispose_ = onDispose;
ownerPodId_ = ownerPodId;
ownerPodName_ = ownerPodName;
taskData_ = null;
sessionData_ = null;
token_ = Guid.NewGuid()
.ToString();
+ delayBeforeAcquisition_ = pollsterOptions.TimeoutBeforeNextAcquisition + TimeSpan.FromSeconds(2);
workerConnectionCts_ = new CancellationTokenSource();
cancellationTokenSource_ = new CancellationTokenSource();
@@ -119,8 +125,10 @@ public async ValueTask DisposeAsync()
{
using var _ = logger_.BeginNamedScope("DisposeAsync",
("taskId", messageHandler_.TaskId),
+ ("messageHandler", messageHandler_.MessageId),
("sessionId", taskData_?.SessionId ?? ""));
+ onDispose_.Invoke();
logger_.LogDebug("MessageHandler status is {status}",
messageHandler_.Status);
await messageHandler_.DisposeAsync()
@@ -165,6 +173,7 @@ public async Task AcquireTask()
{
using var activity = activitySource_.StartActivity($"{nameof(AcquireTask)}");
using var _ = logger_.BeginNamedScope("Acquiring task",
+ ("messageHandler", messageHandler_.MessageId),
("taskId", messageHandler_.TaskId));
try
@@ -295,13 +304,6 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_,
return false;
}
- if (cancellationTokenSource_.IsCancellationRequested)
- {
- messageHandler_.Status = QueueMessageStatus.Postponed;
- logger_.LogDebug("Dependencies resolved but execution cancellation requested");
- return false;
- }
-
taskData_ = taskData_ with
{
OwnerPodId = ownerPodId_,
@@ -352,6 +354,15 @@ await taskTable_.ReleaseTask(taskData_,
.ConfigureAwait(false);
logger_.LogInformation("Task is not running on the other polling agent, status : {status}",
taskData_.Status);
+
+ if (taskData_.Status is TaskStatus.Dispatched && taskData_.AcquisitionDate < DateTime.UtcNow + delayBeforeAcquisition_)
+
+ {
+ messageHandler_.Status = QueueMessageStatus.Postponed;
+ logger_.LogDebug("Wait to exceed acquisition timeout before resubmitting task");
+ return false;
+ }
+
if (taskData_.Status is TaskStatus.Processing or TaskStatus.Dispatched or TaskStatus.Processed)
{
logger_.LogDebug("Resubmitting task {task} on another pod",
@@ -369,6 +380,7 @@ await submitter_.CompleteTaskAsync(taskData_,
.ConfigureAwait(false);
}
+
if (taskData_.Status is TaskStatus.Cancelling)
{
messageHandler_.Status = QueueMessageStatus.Cancelled;
@@ -395,6 +407,13 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_,
return false;
}
+ if (taskData_.OwnerPodId == ownerPodId_ && taskData_.Status != TaskStatus.Dispatched)
+ {
+ logger_.LogInformation("Task is already managed by this agent; message likely to be duplicated");
+ messageHandler_.Status = QueueMessageStatus.Processed;
+ return false;
+ }
+
if (cancellationTokenSource_.IsCancellationRequested)
{
logger_.LogDebug("Task preconditions ok but execution cancellation requested");
@@ -444,6 +463,7 @@ public async Task PreProcessing()
}
using var _ = logger_.BeginNamedScope("PreProcessing",
+ ("messageHandler", messageHandler_.MessageId),
("taskId", messageHandler_.TaskId),
("sessionId", taskData_.SessionId));
logger_.LogDebug("Start prefetch data");
@@ -479,6 +499,7 @@ public async Task ExecuteTask()
}
using var _ = logger_.BeginNamedScope("TaskExecution",
+ ("messageHandler", messageHandler_.MessageId),
("taskId", messageHandler_.TaskId),
("sessionId", taskData_.SessionId));
@@ -506,6 +527,7 @@ await taskTable_.StartTask(taskData_,
workerStreamHandler_.StartTaskProcessing(taskData_,
workerConnectionCts_.Token);
+
if (workerStreamHandler_.Pipe is null)
{
throw new ArmoniKException($"{nameof(IWorkerStreamHandler.Pipe)} should not be null");
@@ -524,6 +546,13 @@ await workerStreamHandler_.Pipe.WriteAsync(new ProcessRequest
await workerStreamHandler_.Pipe.CompleteAsync()
.ConfigureAwait(false);
}
+ catch (TaskAlreadyInFinalStateException e)
+ {
+ messageHandler_.Status = QueueMessageStatus.Processed;
+ logger_.LogWarning(e,
+ "Task already in a final state, removing it from the queue");
+ throw;
+ }
catch (Exception e)
{
await HandleErrorRequeueAsync(e,
@@ -531,6 +560,25 @@ await HandleErrorRequeueAsync(e,
cancellationTokenSource_.Token)
.ConfigureAwait(false);
}
+
+ try
+ {
+ // at this point worker requests should have ended
+ logger_.LogDebug("Wait for task output");
+ reply_ = await workerStreamHandler_.Pipe!.ReadAsync(workerConnectionCts_.Token)
+ .ConfigureAwait(false);
+
+ logger_.LogDebug("Stop agent server");
+ await agentHandler_.Stop(workerConnectionCts_.Token)
+ .ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ await HandleErrorResubmitAsync(e,
+ taskData_,
+ cancellationTokenSource_.Token)
+ .ConfigureAwait(false);
+ }
}
///
@@ -557,25 +605,22 @@ public async Task PostProcessing()
throw new NullReferenceException(nameof(agent_) + " is null.");
}
+ if (reply_ is null)
+ {
+ throw new NullReferenceException(nameof(reply_) + " is null.");
+ }
+
using var _ = logger_.BeginNamedScope("PostProcessing",
+ ("messageHandler", messageHandler_.MessageId),
("taskId", messageHandler_.TaskId),
("sessionId", taskData_.SessionId));
try
{
- // at this point worker requests should have ended
- logger_.LogDebug("Wait for task output");
- var reply = await workerStreamHandler_.Pipe.ReadAsync(workerConnectionCts_.Token)
- .ConfigureAwait(false);
-
- logger_.LogDebug("Stop agent server");
- await agentHandler_.Stop(workerConnectionCts_.Token)
- .ConfigureAwait(false);
-
logger_.LogInformation("Process task output of type {type}",
- reply.Output.TypeCase);
+ reply_.Output.TypeCase);
- if (reply.Output.TypeCase is Output.TypeOneofCase.Ok)
+ if (reply_.Output.TypeCase is Output.TypeOneofCase.Ok)
{
logger_.LogDebug("Complete processing of the request");
await agent_.FinalizeTaskCreation(CancellationToken.None)
@@ -584,7 +629,7 @@ await agent_.FinalizeTaskCreation(CancellationToken.None)
await submitter_.CompleteTaskAsync(taskData_,
false,
- reply.Output,
+ reply_.Output,
CancellationToken.None)
.ConfigureAwait(false);
messageHandler_.Status = QueueMessageStatus.Processed;
diff --git a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs
index af2f3f5f3..a0834723b 100644
--- a/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs
+++ b/Common/src/Pollster/TaskProcessingChecker/TaskProcessingCheckerClient.cs
@@ -16,7 +16,9 @@
// along with this program. If not, see .
using System;
+using System.Linq;
using System.Net.Http;
+using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
@@ -26,6 +28,7 @@ namespace ArmoniK.Core.Common.Pollster.TaskProcessingChecker;
public class TaskProcessingCheckerClient : ITaskProcessingChecker
{
+ private const int Retries = 5;
private readonly IHttpClientFactory httpClientFactory_;
private readonly ILogger logger_;
@@ -43,32 +46,47 @@ public async Task Check(string taskId,
logger_.LogTrace("Check if task is processing");
var client = httpClientFactory_.CreateClient();
- try
+ for (var i = 0; i < Retries; i++)
{
- var result = await client.GetStringAsync("http://" + ownerPodId + ":1080/taskprocessing",
- cancellationToken)
- .ConfigureAwait(false);
- logger_.LogDebug("Result from other polling agent: {result}",
- result);
- return result.Equals(taskId);
- }
- catch (InvalidOperationException ex)
- {
- logger_.LogWarning(ex,
- "Cannot communicate with other pod");
- return false;
- }
- catch (HttpRequestException ex)
- {
- logger_.LogWarning(ex,
- "Cannot communicate with other pod");
- return false;
- }
- catch (UriFormatException ex)
- {
- logger_.LogWarning(ex,
- "Invalid other pod hostname");
- return false;
+ try
+ {
+ var result = await client.GetStringAsync("http://" + ownerPodId + ":1080/taskprocessing",
+ cancellationToken)
+ .ConfigureAwait(false);
+ logger_.LogDebug("Result from other polling agent: {result}",
+ result);
+ return result.Split(",")
+ .Contains(taskId);
+ }
+ catch (InvalidOperationException ex)
+ {
+ logger_.LogWarning(ex,
+ "Cannot communicate with other pod");
+ return false;
+ }
+ catch (HttpRequestException ex) when (ex.InnerException is SocketException
+ {
+ SocketErrorCode: SocketError.ConnectionRefused,
+ })
+ {
+ logger_.LogWarning(ex,
+ "Cannot communicate with other pod");
+ }
+ catch (HttpRequestException ex)
+ {
+ logger_.LogWarning(ex,
+ "Cannot communicate with other pod");
+ return false;
+ }
+ catch (UriFormatException ex)
+ {
+ logger_.LogWarning(ex,
+ "Invalid other pod hostname");
+ return false;
+ }
}
+
+ logger_.LogWarning("Too many tries to communicate with other pod");
+ return false;
}
}
diff --git a/Common/src/Pollster/TaskQueueBase.cs b/Common/src/Pollster/TaskQueueBase.cs
new file mode 100644
index 000000000..03525b831
--- /dev/null
+++ b/Common/src/Pollster/TaskQueueBase.cs
@@ -0,0 +1,82 @@
+// This file is part of the ArmoniK project
+//
+// Copyright (C) ANEO, 2021-2023. All rights reserved.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY, without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+
+namespace ArmoniK.Core.Common.Pollster;
+
+public abstract class TaskQueueBase
+{
+ private readonly Channel channel_;
+
+
+ private readonly Queue exceptions_ = new();
+
+ public TaskQueueBase(bool singleReader)
+ => channel_ = Channel.CreateBounded(new BoundedChannelOptions(1)
+ {
+ Capacity = 1,
+ FullMode = BoundedChannelFullMode.Wait,
+ SingleReader = singleReader,
+ SingleWriter = true,
+ });
+
+ public async Task WriteAsync(TaskHandler handler,
+ CancellationToken cancellationToken)
+ => await channel_.Writer.WriteAsync(handler,
+ cancellationToken)
+ .ConfigureAwait(false);
+
+ public async Task WaitForNextWriteAsync(TimeSpan timeout,
+ CancellationToken cancellationToken)
+ {
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ cts.CancelAfter(timeout);
+
+ await channel_.Writer.WaitToWriteAsync(cts.Token)
+ .ConfigureAwait(false);
+
+ if (channel_.Reader.TryRead(out var handler))
+ {
+ await handler.DisposeAsync()
+ .ConfigureAwait(false);
+ }
+ }
+
+ public async Task ReadAsync(CancellationToken cancellationToken)
+ => await channel_.Reader.ReadAsync(cancellationToken)
+ .ConfigureAwait(false);
+
+ public void AddException(Exception e)
+ => exceptions_.Enqueue(e);
+
+ public bool RemoveException([MaybeNullWhen(false)] out Exception e)
+ {
+ var r = exceptions_.Count > 0;
+
+ e = r
+ ? exceptions_.Dequeue()
+ : null;
+
+ return r;
+ }
+}
diff --git a/Common/tests/Helpers/SimplePullQueueStorageChannel.cs b/Common/tests/Helpers/SimplePullQueueStorageChannel.cs
new file mode 100644
index 000000000..83c5b5949
--- /dev/null
+++ b/Common/tests/Helpers/SimplePullQueueStorageChannel.cs
@@ -0,0 +1,73 @@
+// This file is part of the ArmoniK project
+//
+// Copyright (C) ANEO, 2021-2023. All rights reserved.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY, without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+
+using ArmoniK.Core.Base;
+using ArmoniK.Core.Base.DataStructures;
+
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+
+namespace ArmoniK.Core.Common.Tests.Helpers;
+
+public class SimplePullQueueStorageChannel : IPullQueueStorage
+{
+ public readonly Channel Channel = System.Threading.Channels.Channel.CreateUnbounded();
+
+ public Task Check(HealthCheckTag tag)
+ => Task.FromResult(HealthCheckResult.Healthy());
+
+ public Task Init(CancellationToken cancellationToken)
+ => Task.CompletedTask;
+
+ public int MaxPriority
+ => 10;
+
+
+ public async IAsyncEnumerable PullMessagesAsync(int nbMessages,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ foreach (var _ in Enumerable.Range(0,
+ nbMessages))
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ yield break;
+ }
+
+ IQueueMessageHandler? msg;
+
+ try
+ {
+ msg = await Channel.Reader.ReadAsync(cancellationToken)
+ .ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ yield break;
+ }
+
+ yield return msg;
+ }
+ }
+}
diff --git a/Common/tests/Helpers/TestPollsterProvider.cs b/Common/tests/Helpers/TestPollsterProvider.cs
index 4e478c994..0f9e31eec 100644
--- a/Common/tests/Helpers/TestPollsterProvider.cs
+++ b/Common/tests/Helpers/TestPollsterProvider.cs
@@ -36,6 +36,7 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@@ -123,12 +124,17 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler,
NullLogger.Instance)
.AddSingleton(ActivitySource)
.AddSingleton(_ => client_)
+ .AddLogging()
.AddSingleton()
.AddOption(builder.Configuration,
Injection.Options.Submitter.SettingSection)
.AddSingleton()
.AddSingleton("ownerpodid")
.AddSingleton()
+ .AddHostedService()
+ .AddHostedService()
+ .AddSingleton()
+ .AddSingleton()
.AddSingleton()
.AddSingleton()
.AddOption(builder.Configuration,
@@ -141,6 +147,7 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler,
builder.Services.AddSingleton(computePlanOptions);
app_ = builder.Build();
+ app_.Start();
ResultTable = app_.Services.GetRequiredService();
TaskTable = app_.Services.GetRequiredService();
@@ -164,6 +171,8 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler,
public void Dispose()
{
+ app_.StopAsync()
+ .Wait();
((IDisposable)app_)?.Dispose();
runner_?.Dispose();
GC.SuppressFinalize(this);
diff --git a/Common/tests/Helpers/TestTaskHandlerProvider.cs b/Common/tests/Helpers/TestTaskHandlerProvider.cs
index d0521bee9..077c3ba67 100644
--- a/Common/tests/Helpers/TestTaskHandlerProvider.cs
+++ b/Common/tests/Helpers/TestTaskHandlerProvider.cs
@@ -135,15 +135,27 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler,
Injection.Options.Submitter.SettingSection)
.AddOption(builder.Configuration,
Injection.Options.Pollster.SettingSection)
- .AddSingleton(cancellationTokenSource)
.AddSingleton()
- .AddSingleton("ownerpodid")
- .AddSingleton()
+ .AddSingleton(provider => new TaskHandler(provider.GetRequiredService(),
+ provider.GetRequiredService(),
+ provider.GetRequiredService(),
+ provider.GetRequiredService(),
+ provider.GetRequiredService(),
+ workerStreamHandler,
+ queueStorage,
+ provider.GetRequiredService(),
+ "ownerpodid",
+ "ownerpodname",
+ provider.GetRequiredService(),
+ agentHandler,
+ provider.GetRequiredService(),
+ provider.GetRequiredService(),
+ () =>
+ {
+ },
+ cancellationTokenSource))
.AddSingleton()
- .AddSingleton()
- .AddSingleton(workerStreamHandler)
- .AddSingleton(agentHandler)
- .AddSingleton(queueStorage);
+ .AddSingleton();
if (inputTaskTable is not null)
{
diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs
index be9da9a37..bc4b62a63 100644
--- a/Common/tests/Pollster/PollsterTest.cs
+++ b/Common/tests/Pollster/PollsterTest.cs
@@ -402,10 +402,8 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token));
Assert.True(source.Token.IsCancellationRequested);
- Assert.AreEqual(string.Empty,
+ Assert.AreEqual(Array.Empty(),
testServiceProvider.Pollster.TaskProcessing);
- Assert.AreSame(string.Empty,
- testServiceProvider.Pollster.TaskProcessing);
}
public class WaitWorkerStreamHandler : IWorkerStreamHandler
@@ -468,13 +466,13 @@ public Task CompleteAsync()
[TestCase(5000)] // task should be longer than the grace delay
public async Task ExecuteTaskShouldSucceed(double delay)
{
- var mockPullQueueStorage = new Mock();
+ var mockPullQueueStorage = new SimplePullQueueStorageChannel();
var waitWorkerStreamHandler = new WaitWorkerStreamHandler(delay);
var simpleAgentHandler = new SimpleAgentHandler();
using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler,
simpleAgentHandler,
- mockPullQueueStorage.Object);
+ mockPullQueueStorage);
var (_, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter,
testServiceProvider.PartitionTable,
@@ -482,19 +480,15 @@ public async Task ExecuteTaskShouldSucceed(double delay)
CancellationToken.None)
.ConfigureAwait(false);
- mockPullQueueStorage.Setup(storage => storage.PullMessagesAsync(It.IsAny(),
- It.IsAny()))
- .Returns(() => new List
- {
- new SimpleQueueMessageHandler
- {
- CancellationToken = CancellationToken.None,
- Status = QueueMessageStatus.Waiting,
- MessageId = Guid.NewGuid()
- .ToString(),
- TaskId = taskSubmitted,
- },
- }.ToAsyncEnumerable());
+ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler
+ {
+ CancellationToken = CancellationToken.None,
+ Status = QueueMessageStatus.Waiting,
+ MessageId = Guid.NewGuid()
+ .ToString(),
+ TaskId = taskSubmitted,
+ })
+ .ConfigureAwait(false);
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);
@@ -505,7 +499,9 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
Assert.False(testServiceProvider.Pollster.Failed);
Assert.True(source.Token.IsCancellationRequested);
- Assert.AreEqual(TaskStatus.Completed,
+ Assert.AreEqual(delay < 1000
+ ? TaskStatus.Completed
+ : TaskStatus.Processing,
(await testServiceProvider.TaskTable.GetTaskStatus(new[]
{
taskSubmitted,
@@ -513,22 +509,18 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
CancellationToken.None)
.ConfigureAwait(false)).Single()
.Status);
- Assert.AreEqual(string.Empty,
- testServiceProvider.Pollster.TaskProcessing);
- Assert.AreSame(string.Empty,
- testServiceProvider.Pollster.TaskProcessing);
}
[Test]
public async Task CancelLongTaskShouldSucceed()
{
- var mockPullQueueStorage = new Mock();
+ var mockPullQueueStorage = new SimplePullQueueStorageChannel();
var waitWorkerStreamHandler = new ExceptionWorkerStreamHandler(15000);
var simpleAgentHandler = new SimpleAgentHandler();
using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler,
simpleAgentHandler,
- mockPullQueueStorage.Object);
+ mockPullQueueStorage);
var (_, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter,
testServiceProvider.PartitionTable,
@@ -536,19 +528,15 @@ public async Task CancelLongTaskShouldSucceed()
CancellationToken.None)
.ConfigureAwait(false);
- mockPullQueueStorage.Setup(storage => storage.PullMessagesAsync(It.IsAny(),
- It.IsAny()))
- .Returns(() => new List
- {
- new SimpleQueueMessageHandler
- {
- CancellationToken = CancellationToken.None,
- Status = QueueMessageStatus.Waiting,
- MessageId = Guid.NewGuid()
- .ToString(),
- TaskId = taskSubmitted,
- },
- }.ToAsyncEnumerable());
+ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler
+ {
+ CancellationToken = CancellationToken.None,
+ Status = QueueMessageStatus.Waiting,
+ MessageId = Guid.NewGuid()
+ .ToString(),
+ TaskId = taskSubmitted,
+ })
+ .ConfigureAwait(false);
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);
@@ -572,25 +560,25 @@ await Task.Delay(TimeSpan.FromMilliseconds(200),
CancellationToken.None)
.ConfigureAwait(false);
- await testServiceProvider.Pollster.StopCancelledTask!.Invoke()
+ await testServiceProvider.Pollster.StopCancelledTask()
.ConfigureAwait(false);
Assert.DoesNotThrowAsync(() => mainLoopTask);
Assert.False(testServiceProvider.Pollster.Failed);
Assert.True(source.Token.IsCancellationRequested);
- Assert.AreEqual(TaskStatus.Cancelled,
- (await testServiceProvider.TaskTable.GetTaskStatus(new[]
- {
- taskSubmitted,
- },
- CancellationToken.None)
- .ConfigureAwait(false)).Single()
- .Status);
- Assert.AreEqual(string.Empty,
+ Assert.That((await testServiceProvider.TaskTable.GetTaskStatus(new[]
+ {
+ taskSubmitted,
+ },
+ CancellationToken.None)
+ .ConfigureAwait(false)).Single()
+ .Status,
+ Is.AnyOf(TaskStatus.Cancelled,
+ TaskStatus.Cancelling));
+
+ Assert.AreEqual(Array.Empty(),
testServiceProvider.Pollster.TaskProcessing);
- Assert.AreSame(string.Empty,
- testServiceProvider.Pollster.TaskProcessing);
}
public static IEnumerable ExecuteTooManyErrorShouldFailTestCase
@@ -662,17 +650,15 @@ await pollster.Init(CancellationToken.None)
Assert.DoesNotThrowAsync(() => pollster.MainLoop(source.Token));
Assert.True(pollster.Failed);
Assert.False(source.Token.IsCancellationRequested);
- Assert.AreEqual(string.Empty,
- pollster.TaskProcessing);
- Assert.AreSame(string.Empty,
- pollster.TaskProcessing);
+ Assert.AreEqual(Array.Empty(),
+ testServiceProvider.Pollster.TaskProcessing);
}
[Test]
public async Task UnavailableWorkerShouldFail()
{
- var mockPullQueueStorage = new Mock();
+ var mockPullQueueStorage = new SimplePullQueueStorageChannel();
var simpleAgentHandler = new SimpleAgentHandler();
var mockStreamHandlerFail = new Mock();
@@ -683,7 +669,7 @@ public async Task UnavailableWorkerShouldFail()
using var testServiceProvider = new TestPollsterProvider(mockStreamHandlerFail.Object,
simpleAgentHandler,
- mockPullQueueStorage.Object);
+ mockPullQueueStorage);
var (_, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter,
testServiceProvider.PartitionTable,
@@ -691,19 +677,15 @@ public async Task UnavailableWorkerShouldFail()
CancellationToken.None)
.ConfigureAwait(false);
- mockPullQueueStorage.Setup(storage => storage.PullMessagesAsync(It.IsAny(),
- It.IsAny()))
- .Returns(() => new List
- {
- new SimpleQueueMessageHandler
- {
- CancellationToken = CancellationToken.None,
- Status = QueueMessageStatus.Waiting,
- MessageId = Guid.NewGuid()
- .ToString(),
- TaskId = taskSubmitted,
- },
- }.ToAsyncEnumerable());
+ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler
+ {
+ CancellationToken = CancellationToken.None,
+ Status = QueueMessageStatus.Waiting,
+ MessageId = Guid.NewGuid()
+ .ToString(),
+ TaskId = taskSubmitted,
+ })
+ .ConfigureAwait(false);
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);
@@ -711,8 +693,6 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token));
- Assert.True(testServiceProvider.Pollster.Failed);
- Assert.False(source.Token.IsCancellationRequested);
Assert.AreEqual(TaskStatus.Submitted,
(await testServiceProvider.TaskTable.GetTaskStatus(new[]
@@ -722,9 +702,7 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
CancellationToken.None)
.ConfigureAwait(false)).Single()
.Status);
- Assert.AreEqual(string.Empty,
+ Assert.AreEqual(Array.Empty(),
testServiceProvider.Pollster.TaskProcessing);
- Assert.AreSame(string.Empty,
- testServiceProvider.Pollster.TaskProcessing);
}
}
diff --git a/Common/tests/Pollster/TaskHandlerTest.cs b/Common/tests/Pollster/TaskHandlerTest.cs
index 967dea8b4..f04f59af8 100644
--- a/Common/tests/Pollster/TaskHandlerTest.cs
+++ b/Common/tests/Pollster/TaskHandlerTest.cs
@@ -1116,12 +1116,10 @@ public static IEnumerable TestCaseOuptut
await testServiceProvider.TaskHandler.PreProcessing()
.ConfigureAwait(false);
- await testServiceProvider.TaskHandler.ExecuteTask()
- .ConfigureAwait(false);
-
cancellationTokenSource.CancelAfter(TimeSpan.FromMilliseconds(1500));
- Assert.ThrowsAsync(() => testServiceProvider.TaskHandler.PostProcessing());
+ Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.ExecuteTask()
+ .ConfigureAwait(false));
return ((await testServiceProvider.TaskTable.GetTaskStatus(new[]
{
@@ -1230,11 +1228,15 @@ public async Task ExecuteTaskUntilErrorShouldSucceed()
await testServiceProvider.TaskHandler.PreProcessing()
.ConfigureAwait(false);
- await testServiceProvider.TaskHandler.ExecuteTask()
- .ConfigureAwait(false);
- Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.PostProcessing()
- .ConfigureAwait(false));
+ Assert.ThrowsAsync(async () =>
+ {
+ await testServiceProvider.TaskHandler.ExecuteTask()
+ .ConfigureAwait(false);
+
+ await testServiceProvider.TaskHandler.PostProcessing()
+ .ConfigureAwait(false);
+ });
taskData = await testServiceProvider.TaskTable.ReadTaskAsync(taskId,
@@ -1376,10 +1378,7 @@ public async Task ExecuteTaskWithErrorDuringExecutionInWorkerHandlerShouldThrow<
await testServiceProvider.TaskHandler.PreProcessing()
.ConfigureAwait(false);
- await testServiceProvider.TaskHandler.ExecuteTask()
- .ConfigureAwait(false);
-
- Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.PostProcessing()
+ Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.ExecuteTask()
.ConfigureAwait(false));
var taskData = await testServiceProvider.TaskTable.ReadTaskAsync(taskId,
@@ -1501,8 +1500,7 @@ public async Task CancelLongTaskShouldSucceed()
await testServiceProvider.TaskHandler.PreProcessing()
.ConfigureAwait(false);
- await testServiceProvider.TaskHandler.ExecuteTask()
- .ConfigureAwait(false);
+ var exec = testServiceProvider.TaskHandler.ExecuteTask();
// Cancel task for test
@@ -1529,8 +1527,7 @@ await testServiceProvider.TaskHandler.StopCancelledTask()
await testServiceProvider.TaskHandler.StopCancelledTask()
.ConfigureAwait(false);
- Assert.That(testServiceProvider.TaskHandler.PostProcessing,
- Throws.InstanceOf());
+ Assert.ThrowsAsync(() => exec);
Assert.AreEqual(TaskStatus.Cancelling,
(await testServiceProvider.TaskTable.GetTaskStatus(new[]
diff --git a/Common/tests/TestBase/TaskTableTestBase.cs b/Common/tests/TestBase/TaskTableTestBase.cs
index ff6b0db59..69125bd1e 100644
--- a/Common/tests/TestBase/TaskTableTestBase.cs
+++ b/Common/tests/TestBase/TaskTableTestBase.cs
@@ -1167,6 +1167,48 @@ public void StartTaskShouldFail()
}
}
+ [Test]
+ [TestCase(TaskStatus.Completed)]
+ [TestCase(TaskStatus.Retried)]
+ [TestCase(TaskStatus.Error)]
+ [TestCase(TaskStatus.Cancelled)]
+ public async Task StartTaskInFinalStateShouldThrow(TaskStatus status)
+ {
+ if (RunTests)
+ {
+ var taskId = Guid.NewGuid()
+ .ToString();
+
+ await TaskTable!.CreateTasks(new[]
+ {
+ new TaskData("session",
+ taskId,
+ "owner",
+ "owner",
+ "payload",
+ new List(),
+ new List(),
+ new List(),
+ new List(),
+ status,
+ Options,
+ new Output(false,
+ "")),
+ })
+ .ConfigureAwait(false);
+
+ Assert.ThrowsAsync(async () =>
+ {
+ await TaskTable!.StartTask(taskSubmittedData_ with
+ {
+ TaskId = taskId,
+ },
+ CancellationToken.None)
+ .ConfigureAwait(false);
+ });
+ }
+ }
+
[Test]
public void DeleteTaskShouldFail()
{
diff --git a/Compute/PollingAgent/src/Program.cs b/Compute/PollingAgent/src/Program.cs
index ac828ca04..de05597e6 100644
--- a/Compute/PollingAgent/src/Program.cs
+++ b/Compute/PollingAgent/src/Program.cs
@@ -96,6 +96,10 @@ public static async Task Main(string[] args)
.AddLocalStorage(builder.Configuration,
logger.GetLogger())
.AddHostedService()
+ .AddHostedService()
+ .AddHostedService()
+ .AddSingleton()
+ .AddSingleton()
.AddSingletonWithHealthCheck(nameof(Common.Pollster.Pollster))
.AddSingleton(logger)
.AddSingleton()
@@ -182,20 +186,13 @@ public static async Task Main(string[] args)
});
endpoints.MapGet("/taskprocessing",
- () => Task.FromResult(app.Services.GetRequiredService()
- .TaskProcessing));
+ () => Task.FromResult(string.Join(",",
+ app.Services.GetRequiredService()
+ .TaskProcessing)));
endpoints.MapGet("/stopcancelledtask",
- async () =>
- {
- var stopCancelledTask = app.Services.GetRequiredService()
- .StopCancelledTask;
- if (stopCancelledTask != null)
- {
- await stopCancelledTask.Invoke()
- .ConfigureAwait(false);
- }
- });
+ () => app.Services.GetRequiredService()
+ .StopCancelledTask());
});
var pushQueueStorage = app.Services.GetRequiredService();
diff --git a/Tests/Common/Client/src/GrpcChannelExt.cs b/Tests/Common/Client/src/GrpcChannelExt.cs
index a35ea0396..e9e8511a4 100644
--- a/Tests/Common/Client/src/GrpcChannelExt.cs
+++ b/Tests/Common/Client/src/GrpcChannelExt.cs
@@ -120,22 +120,32 @@ public static async Task LogStatsFromSessionAsync(this ChannelBase channel,
})
.ConfigureAwait(false))
{
- if (taskDetailed.Status is TaskStatus.Completed or TaskStatus.Error or TaskStatus.Retried)
+ try
{
- var useRatio = (taskDetailed.EndedAt - taskDetailed.StartedAt).ToTimeSpan()
- .TotalMilliseconds / (taskDetailed.EndedAt - taskDetailed.ReceivedAt).ToTimeSpan()
- .TotalMilliseconds;
-
- usageRatio.Add(useRatio);
+ if (taskDetailed.Status is TaskStatus.Completed or TaskStatus.Error or TaskStatus.Retried)
+ {
+ var useRatio = (taskDetailed.EndedAt - taskDetailed.StartedAt).ToTimeSpan()
+ .TotalMilliseconds / (taskDetailed.EndedAt - taskDetailed.ReceivedAt).ToTimeSpan()
+ .TotalMilliseconds;
+
+ usageRatio.Add(useRatio);
+ }
+
+ if (taskDetailed.DataDependencies.Count > 0)
+ {
+ taskAggregation.Add(taskDetailed);
+ }
+
+ taskDependencies.Add(taskDetailed.Id,
+ taskDetailed);
}
-
- if (taskDetailed.DataDependencies.Count > 0)
+ catch (Exception e)
{
- taskAggregation.Add(taskDetailed);
+ logger.LogError(e,
+ "Cannot process {@task}",
+ taskDetailed);
+ throw;
}
-
- taskDependencies.Add(taskDetailed.Id,
- taskDetailed);
}
var timediff = new List();
diff --git a/tools/logs2seq.py b/tools/logs2seq.py
index a8f00eacb..9ee2559d1 100644
--- a/tools/logs2seq.py
+++ b/tools/logs2seq.py
@@ -6,6 +6,11 @@
import boto3
import os
import gzip
+import logging
+from pathlib import Path
+from typing import IO
+from json.decoder import JSONDecodeError
+
# How to run seq in docker
# docker rm -f seqlogpipe
@@ -24,6 +29,11 @@
#
# export AWS_PROFILE=armonikDev
+logger = logging.getLogger(Path(__file__).name)
+logging.basicConfig(
+ level=logging.INFO
+)
+
parser = argparse.ArgumentParser(description="Download ArmoniK logs in JSON CLEF format from S3 bucket then send them to Seq.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("bucket_name", help="S3 bucket", type=str)
parser.add_argument("folder_name_core", help="Folder where core logs are located", type=str)
@@ -43,40 +53,51 @@
s3 = boto3.client('s3')
s3.download_file(args.bucket_name, obj_name, file_name)
-def process_json_log(url, file_name):
- batch = 0
+def process_json_log(url: str, file_name: str):
ctr = 0
- tosend = ""
+ tosend = b""
with open(file_name, "r") as file:
for line in file.readlines():
if line.startswith("{"):
- tosend += line + "\n"
- if batch > 100:
- requests.post(url, data=tosend)
- tosend = ""
- batch = 0
- batch = batch + 1
- ctr = ctr + 1
- print("sent :", ctr)
- if tosend != "":
+ try:
+ parsed = json.loads(line)
+ if "@t" not in parsed:
+ continue
+ ctr = ctr + 1
+ log_message = bytes(line + "\n", "utf-8")
+ if len(tosend) + len(log_message) > 100000:
+ requests.post(url, data=tosend)
+ tosend = log_message
+ else:
+ tosend += log_message
+ except JSONDecodeError as e:
+ logger.warning(f"Failed to parse JSON: {e}")
+ logger.info(f"sent : {ctr}")
+ if tosend != b"":
requests.post(url, data=tosend)
-def process_jsongz_log(url, file_name):
- batch = 0
+
+def process_jsongz_log(url: str, file_name: str):
ctr = 0
- tosend = ""
+ tosend = b""
with gzip.open(file_name, "r") as file:
for line in file.read().decode("utf-8").split("\n"):
if line.startswith("{"):
- tosend += line + "\n"
- if batch > 100:
- requests.post(url, data=tosend)
- tosend = ""
- batch = 0
- batch = batch + 1
- ctr = ctr + 1
- print("sent :", ctr)
- if tosend != "":
+ try:
+ parsed = json.loads(line)
+ if "@t" not in parsed:
+ continue
+ ctr = ctr + 1
+ log_message = bytes(line + "\n", "utf-8")
+ if len(tosend) + len(log_message) > 100000:
+ requests.post(url, data=tosend)
+ tosend = log_message
+ else:
+ tosend += log_message
+ except JSONDecodeError as e:
+ logger.warning(f"Failed to parse JSON: {e}")
+ logger.info(f"sent : {ctr}")
+ if tosend != b"":
requests.post(url, data=tosend)
if file_name.endswith(".json.tar.gz"):