diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index bf098cf3a..9467a034b 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -43,6 +43,8 @@ using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; +using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus; + namespace ArmoniK.Core.Common.Pollster; public class Pollster : IInitializable @@ -170,12 +172,7 @@ public async Task Init(CancellationToken cancellationToken) public async Task Check(HealthCheckTag tag) { - if (healthCheckFailedResult_ is not null) - { - return healthCheckFailedResult_ ?? HealthCheckResult.Unhealthy("Health Check failed previously so this polling agent should be destroyed."); - } - - if (endLoopReached_) + if (endLoopReached_ && taskProcessingDict_.IsEmpty) { return HealthCheckResult.Unhealthy("End of main loop reached, no more tasks will be executed."); } @@ -234,6 +231,12 @@ public async Task Check(HealthCheckTag tag) healthCheckFailedResult_ = result; } + if (tag == HealthCheckTag.Liveness && taskProcessingDict_.Any(pair => pair.Value.GetAcquiredTaskInfoOrNull() + ?.TaskStatus is not null and not TaskStatus.Dispatched)) + { + return HealthCheckResult.Healthy(); + } + if (tag == HealthCheckTag.Readiness && taskProcessingDict_.IsEmpty) { return HealthCheckResult.Unhealthy("No tasks to process"); @@ -264,11 +267,10 @@ await Init(exceptionManager_.EarlyCancellationToken) if (healthCheckFailedResult_ is not null) { var hcr = healthCheckFailedResult_.Value; - exceptionManager_.FatalError(logger_, - hcr.Exception, - "Health Check failed with status {Status} thus no more tasks will be executed:\n{Description}", - hcr.Status, - hcr.Description); + logger_.LogError(hcr.Exception, + "Health Check failed with status {Status} thus no more tasks will be acquired (tasks already acquired will be executed to completion if possible):\n{Description}", + hcr.Status, + hcr.Description); return; } diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index cb6ae1329..0fbc7595a 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -707,12 +707,21 @@ await ReleaseAndPostponeTask() /// The metadata of the task /// public TaskInfo GetAcquiredTaskInfo() + => GetAcquiredTaskInfoOrNull() ?? throw new ArmoniKException("TaskData should not be null after successful acquisition"); + + /// + /// Get the meta data of the acquired task + /// + /// + /// The metadata of the task or null + /// + public TaskInfo? GetAcquiredTaskInfoOrNull() => taskData_ is not null ? new TaskInfo(taskData_.SessionId, taskData_.TaskId, messageHandler_.MessageId, taskData_.Status) - : throw new ArmoniKException("TaskData should not be null after successful acquisition"); + : null; /// /// Release task from the current agent and set message to diff --git a/Common/tests/Helpers/SimplePullQueueStorageChannel.cs b/Common/tests/Helpers/SimplePullQueueStorageChannel.cs index ce3aee20e..8091c5bf4 100644 --- a/Common/tests/Helpers/SimplePullQueueStorageChannel.cs +++ b/Common/tests/Helpers/SimplePullQueueStorageChannel.cs @@ -32,10 +32,11 @@ namespace ArmoniK.Core.Common.Tests.Helpers; public class SimplePullQueueStorageChannel : IPullQueueStorage, IPushQueueStorage { - public readonly Channel Channel = System.Threading.Channels.Channel.CreateUnbounded(); + public readonly Channel Channel = System.Threading.Channels.Channel.CreateUnbounded(); + public HealthCheckResult CheckResult = HealthCheckResult.Healthy(); public Task Check(HealthCheckTag tag) - => Task.FromResult(HealthCheckResult.Healthy()); + => Task.FromResult(CheckResult); public Task Init(CancellationToken cancellationToken) => Task.CompletedTask; diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index f9ea55cea..a1420cb08 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -376,7 +376,6 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) // This test that we return from the mainloop after the health check is unhealthy await testServiceProvider.Pollster.MainLoop() .ConfigureAwait(false); - Assert.True(testServiceProvider.ExceptionManager.Failed); } [Test] @@ -449,10 +448,14 @@ await Task.Delay(TimeSpan.FromMilliseconds(delay_), [Test] [Timeout(10000)] - public async Task ExecuteTaskShouldSucceed() + public async Task ExecuteTaskShouldSucceed([Values] HealthStatus healthStatus) { - var mockPullQueueStorage = new SimplePullQueueStorageChannel(); - var waitWorkerStreamHandler = new SimpleWorkerStreamHandler(); + var mockPullQueueStorage = new SimplePullQueueStorageChannel + { + CheckResult = new HealthCheckResult(healthStatus, + $"value for test called {nameof(ExecuteTaskShouldSucceed)}"), + }; + var waitWorkerStreamHandler = new WaitWorkerStreamHandler(500); var simpleAgentHandler = new SimpleAgentHandler(); using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler, @@ -480,8 +483,13 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(1)); + var loop = testServiceProvider.Pollster.MainLoop(); - Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop()); + // checking health should not interrupt task execution + var res = await testServiceProvider.Pollster.Check(HealthCheckTag.Liveness) + .ConfigureAwait(false); + + Assert.DoesNotThrowAsync(() => loop); Assert.DoesNotThrowAsync(() => stop); Assert.AreEqual(TaskStatus.Completed,