diff --git a/Adaptors/Memory/src/TaskTable.cs b/Adaptors/Memory/src/TaskTable.cs index 2d12bc97c..d6310bb36 100644 --- a/Adaptors/Memory/src/TaskTable.cs +++ b/Adaptors/Memory/src/TaskTable.cs @@ -18,17 +18,14 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Collections.Immutable; using System.Linq; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using ArmoniK.Api.Common.Utils; -using ArmoniK.Api.gRPC.V1.Submitter; using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Exceptions; -using ArmoniK.Core.Common.gRPC.Convertors; using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Utils; @@ -128,20 +125,6 @@ public Task StartTask(TaskData taskData, return Task.CompletedTask; } - /// - public async Task> CountTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - => await ListTasksAsync(filter, - cancellationToken) - .Select(taskId => taskId2TaskData_[taskId] - .Status) - .GroupBy(status => status) - .SelectAwait(async grouping => new TaskStatusCount(grouping.Key, - await grouping.CountAsync(cancellationToken) - .ConfigureAwait(false))) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); - /// public Task> CountTasksAsync(Expression> filter, CancellationToken cancellationToken = default) @@ -182,33 +165,6 @@ public Task DeleteTaskAsync(string id, out _)); } - /// - public IAsyncEnumerable ListTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - { - IEnumerable rawList = filter.IdsCase switch - { - TaskFilter.IdsOneofCase.None => - throw new ArgumentException("Filter is not properly initialized. Either the session or the tasks are required", - nameof(filter)), - TaskFilter.IdsOneofCase.Session => filter.Session.Ids.SelectMany(s => session2TaskIds_[s]) - .ToImmutableList(), - TaskFilter.IdsOneofCase.Task => filter.Task.Ids, - _ => throw new ArgumentException("Filter is set to an unknown IdsCase."), - }; - - return rawList.Where(taskId => filter.StatusesCase switch - { - TaskFilter.StatusesOneofCase.None => true, - TaskFilter.StatusesOneofCase.Included => filter.Included.Statuses.Contains(taskId2TaskData_[taskId] - .Status.ToGrpcStatus()), - TaskFilter.StatusesOneofCase.Excluded => !filter.Excluded.Statuses.Contains(taskId2TaskData_[taskId] - .Status.ToGrpcStatus()), - _ => throw new ArgumentException("Filter is set to an unknown StatusesCase."), - }) - .ToAsyncEnumerable(); - } - /// public Task<(IEnumerable tasks, long totalCount)> ListTasksAsync(Expression> filter, Expression> orderField, @@ -466,39 +422,9 @@ public Task Check(HealthCheckTag tag) : HealthCheckResult.Unhealthy()); /// - public async Task CountAllTasksAsync(TaskStatus status, - CancellationToken cancellationToken = default) - { - var count = 0; - - foreach (var session in session2TaskIds_.Keys) - { - var statusFilter = new TaskFilter - { - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - status.ToGrpcStatus(), - }, - }, - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - session, - }, - }, - }; - - count += await ListTasksAsync(statusFilter, - cancellationToken) - .CountAsync(cancellationToken) - .ConfigureAwait(false); - } - - return count; - } + public Task CountAllTasksAsync(TaskStatus status, + CancellationToken cancellationToken = default) + => Task.FromResult(taskId2TaskData_.Count(pair => pair.Value.Status == status)); /// public Task> GetTaskStatus(IEnumerable taskIds, diff --git a/Adaptors/MongoDB/src/Table/MongoQueryableExt.cs b/Adaptors/MongoDB/src/Table/MongoQueryableExt.cs deleted file mode 100644 index e72506cb1..000000000 --- a/Adaptors/MongoDB/src/Table/MongoQueryableExt.cs +++ /dev/null @@ -1,31 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using ArmoniK.Api.gRPC.V1.Submitter; -using ArmoniK.Core.Common.gRPC; -using ArmoniK.Core.Common.Storage; - -using MongoDB.Driver.Linq; - -namespace ArmoniK.Core.Adapters.MongoDB.Table; - -public static class MongoQueryableExt -{ - public static IMongoQueryable FilterQuery(this IMongoQueryable taskQueryable, - TaskFilter filter) - => taskQueryable.Where(filter.ToFilterExpression()); -} diff --git a/Adaptors/MongoDB/src/TaskTable.cs b/Adaptors/MongoDB/src/TaskTable.cs index 874e31a43..3c2cf697f 100644 --- a/Adaptors/MongoDB/src/TaskTable.cs +++ b/Adaptors/MongoDB/src/TaskTable.cs @@ -20,14 +20,11 @@ using System.Diagnostics; using System.Linq; using System.Linq.Expressions; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1.Submitter; using ArmoniK.Core.Adapters.MongoDB.Common; using ArmoniK.Core.Adapters.MongoDB.Options; -using ArmoniK.Core.Adapters.MongoDB.Table; using ArmoniK.Core.Adapters.MongoDB.Table.DataModel; using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Exceptions; @@ -678,43 +675,4 @@ public async Task> GetTaskStatus(IEnumerable t .ToListAsync(cancellationToken) .ConfigureAwait(false); } - - /// - public async Task> CountTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - { - using var activity = activitySource_.StartActivity($"{nameof(CountTasksAsync)}"); - - var sessionHandle = sessionProvider_.Get(); - var taskCollection = taskCollectionProvider_.Get(); - - - var res = await taskCollection.AsQueryable(sessionHandle) - .FilterQuery(filter) - .GroupBy(model => model.Status) - .Select(models => new TaskStatusCount(models.Key, - models.Count())) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); - - return res; - } - - /// - public async IAsyncEnumerable ListTasksAsync(TaskFilter filter, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - using var activity = activitySource_.StartActivity($"{nameof(ListTasksAsync)}"); - var sessionHandle = sessionProvider_.Get(); - var taskCollection = taskCollectionProvider_.Get(); - - await foreach (var taskId in taskCollection.AsQueryable(sessionHandle) - .FilterQuery(filter) - .Select(model => model.TaskId) - .ToAsyncEnumerable(cancellationToken) - .ConfigureAwait(false)) - { - yield return taskId; - } - } } diff --git a/Common/src/Storage/ITaskTable.cs b/Common/src/Storage/ITaskTable.cs index a44fd99f0..f70cf5458 100644 --- a/Common/src/Storage/ITaskTable.cs +++ b/Common/src/Storage/ITaskTable.cs @@ -22,7 +22,6 @@ using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1.Submitter; using ArmoniK.Core.Base; using ArmoniK.Core.Common.Exceptions; @@ -105,16 +104,6 @@ Task IsTaskCancelledAsync(string taskId, Task StartTask(TaskData taskData, CancellationToken cancellationToken = default); - /// - /// Count tasks matching a given filter - /// - /// Task Filter describing the tasks to be counted - /// Token used to cancel the execution of the method - /// - /// The number of tasks that matched the filter - /// - Task> CountTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default); /// /// Count tasks matching a given filter @@ -158,17 +147,6 @@ Task CountAllTasksAsync(TaskStatus status, Task DeleteTaskAsync(string id, CancellationToken cancellationToken = default); - /// - /// List all tasks matching a given filter - /// - /// Task Filter describing the tasks to be counted - /// Token used to cancel the execution of the method - /// - /// List of tasks that matched the filter - /// - IAsyncEnumerable ListTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default); - /// /// List all tasks matching the given filter and ordering /// diff --git a/Common/src/Storage/TaskTableExtensions.cs b/Common/src/Storage/TaskTableExtensions.cs index 5925e3a40..744dbf4b4 100644 --- a/Common/src/Storage/TaskTableExtensions.cs +++ b/Common/src/Storage/TaskTableExtensions.cs @@ -22,11 +22,6 @@ using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1.Submitter; -using ArmoniK.Core.Common.Exceptions; -using ArmoniK.Core.Common.gRPC; -using ArmoniK.Core.Common.gRPC.Convertors; - namespace ArmoniK.Core.Common.Storage; public static class TaskTableExtensions @@ -40,14 +35,6 @@ public static class TaskTableExtensions TaskStatus.Timeout, }; - public static async Task CancelTasks(this ITaskTable taskTable, - TaskFilter filter, - CancellationToken cancellationToken = default) - => (int)await taskTable.UpdateAllTaskStatusAsync(filter, - TaskStatus.Cancelling, - cancellationToken) - .ConfigureAwait(false); - /// /// Change the status of the task to canceled /// @@ -192,38 +179,6 @@ public static async Task SetTaskRetryAsync(this ITaskTable taskTable, return task.Status != TaskStatus.Retried; } - /// - /// Update the statuses of all tasks matching a given filter - /// - /// Interface to manage tasks lifecycle - /// Task Filter describing the tasks whose status should be updated - /// The new task status - /// Token used to cancel the execution of the method - /// - /// The number of updated tasks - /// - public static async Task UpdateAllTaskStatusAsync(this ITaskTable taskTable, - TaskFilter filter, - TaskStatus status, - CancellationToken cancellationToken = default) - { - if (filter.Included != null && (filter.Included.Statuses.Contains(TaskStatus.Completed.ToGrpcStatus()) || - filter.Included.Statuses.Contains(TaskStatus.Cancelled.ToGrpcStatus()) || - filter.Included.Statuses.Contains(TaskStatus.Error.ToGrpcStatus()) || - filter.Included.Statuses.Contains(TaskStatus.Retried.ToGrpcStatus()))) - { - throw new ArmoniKException("The given TaskFilter contains a terminal state, update forbidden"); - } - - return await taskTable.UpdateManyTasks(filter.ToFilterExpression(), - new List<(Expression> selector, object? newValue)> - { - (tdm => tdm.Status, status), - }, - cancellationToken) - .ConfigureAwait(false); - } - /// /// Cancels all tasks in a given session /// diff --git a/Common/src/gRPC/Convertors/TaskTableExt.cs b/Common/src/gRPC/Convertors/TaskTableExt.cs new file mode 100644 index 000000000..4cb28db15 --- /dev/null +++ b/Common/src/gRPC/Convertors/TaskTableExt.cs @@ -0,0 +1,113 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1.Submitter; +using ArmoniK.Core.Common.Exceptions; +using ArmoniK.Core.Common.Storage; + +using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus; + +namespace ArmoniK.Core.Common.gRPC.Convertors; + +public static class TaskTableExt +{ + /// + /// Count tasks matching a given filter + /// + /// Interface to manage tasks lifecycle + /// Task Filter describing the tasks to be counted + /// Token used to cancel the execution of the method + /// + /// The number of tasks that matched the filter + /// + public static Task> CountTasksAsync(this ITaskTable taskTable, + TaskFilter filter, + CancellationToken cancellationToken = default) + => taskTable.CountTasksAsync(filter.ToFilterExpression(), + cancellationToken); + + /// + /// List all tasks matching a given filter + /// + /// Interface to manage tasks lifecycle + /// Task Filter describing the tasks to be counted + /// Token used to cancel the execution of the method + /// + /// List of tasks that matched the filter + /// + public static IAsyncEnumerable ListTasksAsync(this ITaskTable taskTable, + TaskFilter filter, + CancellationToken cancellationToken = default) + => taskTable.FindTasksAsync(filter.ToFilterExpression(), + data => data.TaskId, + cancellationToken); + + /// + /// Update the statuses of all tasks matching a given filter + /// + /// Interface to manage tasks lifecycle + /// Task Filter describing the tasks whose status should be updated + /// The new task status + /// Token used to cancel the execution of the method + /// + /// The number of updated tasks + /// + public static async Task UpdateAllTaskStatusAsync(this ITaskTable taskTable, + TaskFilter filter, + TaskStatus status, + CancellationToken cancellationToken = default) + { + if (filter.Included != null && (filter.Included.Statuses.Contains(TaskStatus.Completed.ToGrpcStatus()) || + filter.Included.Statuses.Contains(TaskStatus.Cancelled.ToGrpcStatus()) || + filter.Included.Statuses.Contains(TaskStatus.Error.ToGrpcStatus()) || + filter.Included.Statuses.Contains(TaskStatus.Retried.ToGrpcStatus()))) + { + throw new ArmoniKException("The given TaskFilter contains a terminal state, update forbidden"); + } + + return await taskTable.UpdateManyTasks(filter.ToFilterExpression(), + new List<(Expression> selector, object? newValue)> + { + (tdm => tdm.Status, status), + }, + cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Cancel the tasks matching the given filter + /// + /// Interface to manage tasks lifecycle + /// Task Filter describing the tasks that should be cancelled + /// Token used to cancel the execution of the method + /// + /// The number of cancelled tasks + /// + public static async Task CancelTasks(this ITaskTable taskTable, + TaskFilter filter, + CancellationToken cancellationToken = default) + => (int)await taskTable.UpdateAllTaskStatusAsync(filter, + TaskStatus.Cancelling, + cancellationToken) + .ConfigureAwait(false); +} diff --git a/Common/tests/Helpers/SimpleTaskTable.cs b/Common/tests/Helpers/SimpleTaskTable.cs index 55604db23..1bd8f7563 100644 --- a/Common/tests/Helpers/SimpleTaskTable.cs +++ b/Common/tests/Helpers/SimpleTaskTable.cs @@ -22,7 +22,6 @@ using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1.Submitter; using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Storage; @@ -97,14 +96,6 @@ public Task StartTask(TaskData taskData, CancellationToken cancellationToken = default) => Task.CompletedTask; - public Task> CountTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - => Task.FromResult>(new List - { - new(TaskStatus.Completed, - 42), - }); - public Task> CountTasksAsync(Expression> filter, CancellationToken cancellationToken = default) => Task.FromResult>(new List @@ -129,13 +120,6 @@ public Task DeleteTaskAsync(string id, CancellationToken cancellationToken = default) => Task.CompletedTask; - public IAsyncEnumerable ListTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - => new List - { - TaskId, - }.ToAsyncEnumerable(); - public Task<(IEnumerable tasks, long totalCount)> ListTasksAsync(Expression> filter, Expression> orderField, Expression> selector, diff --git a/Common/tests/Pollster/TaskHandlerTest.cs b/Common/tests/Pollster/TaskHandlerTest.cs index a6d7794ab..7c51989a3 100644 --- a/Common/tests/Pollster/TaskHandlerTest.cs +++ b/Common/tests/Pollster/TaskHandlerTest.cs @@ -607,10 +607,6 @@ public Task StartTask(TaskData taskData, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public Task> CountTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - => throw new NotImplementedException(); - public Task> CountTasksAsync(Expression> filter, CancellationToken cancellationToken = default) => throw new NotImplementedException(); @@ -626,10 +622,6 @@ public Task DeleteTaskAsync(string id, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public IAsyncEnumerable ListTasksAsync(TaskFilter filter, - CancellationToken cancellationToken) - => throw new NotImplementedException(); - public Task<(IEnumerable tasks, long totalCount)> ListTasksAsync(Expression> filter, Expression> orderField, Expression> selector, diff --git a/Common/tests/Submitter/GrpcSubmitterServiceTests.cs b/Common/tests/Submitter/GrpcSubmitterServiceTests.cs index c9a8d2b9a..0e134d71e 100644 --- a/Common/tests/Submitter/GrpcSubmitterServiceTests.cs +++ b/Common/tests/Submitter/GrpcSubmitterServiceTests.cs @@ -1659,8 +1659,9 @@ await service.GetTaskStatus(new GetTaskStatusRequest public async Task ListTasksShouldSucceed() { var mock = new Mock(); - mock.Setup(taskTable => taskTable.ListTasksAsync(It.IsAny(), - CancellationToken.None)) + mock.Setup(taskTable => taskTable.FindTasksAsync(It.IsAny>>(), + It.IsAny>>(), + It.IsAny())) .Returns(() => new List { "TaskId", @@ -1696,8 +1697,9 @@ public async Task ListTasksShouldSucceed() public async Task ListTaskExceptionShouldThrow() { var mock = new Mock(); - mock.Setup(taskTable => taskTable.ListTasksAsync(It.IsAny(), - CancellationToken.None)) + mock.Setup(taskTable => taskTable.FindTasksAsync(It.IsAny>>(), + It.IsAny>>(), + It.IsAny())) .Returns(() => throw new Exception()); var service = new GrpcSubmitterService(mockSubmitter_.Object, @@ -1734,8 +1736,9 @@ await service.ListTasks(new TaskFilter public async Task ListTaskArmonikExceptionShouldThrow() { var mock = new Mock(); - mock.Setup(taskTable => taskTable.ListTasksAsync(It.IsAny(), - CancellationToken.None)) + mock.Setup(taskTable => taskTable.FindTasksAsync(It.IsAny>>(), + It.IsAny>>(), + It.IsAny())) .Returns(() => throw new ArmoniKException()); var service = new GrpcSubmitterService(mockSubmitter_.Object, @@ -1768,44 +1771,6 @@ await service.ListTasks(new TaskFilter } } - [Test] - public async Task ListTaskTaskNotFoundExceptionShouldThrow() - { - var mock = new Mock(); - mock.Setup(taskTable => taskTable.ListTasksAsync(It.IsAny(), - CancellationToken.None)) - .Returns(() => throw new TaskNotFoundException()); - - var service = new GrpcSubmitterService(mockSubmitter_.Object, - mock.Object, - mockSessionTable_.Object, - mockResultTable_.Object, - NullLogger.Instance); - - try - { - await service.ListTasks(new TaskFilter - { - Task = new TaskFilter.Types.IdsRequest - { - Ids = - { - "TaskId", - }, - }, - }, - TestServerCallContext.Create()) - .ConfigureAwait(false); - Assert.Fail(); - } - catch (RpcException e) - { - Console.WriteLine(e); - Assert.AreEqual(StatusCode.NotFound, - e.StatusCode); - } - } - [Test] [Obsolete("Method tested is obsolete")] public async Task GetResultStatusAsyncArmoniKNotFoundExceptionShouldThrow() diff --git a/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs b/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs index 539d2d582..21761d1fa 100644 --- a/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs +++ b/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs @@ -864,10 +864,6 @@ public Task StartTask(TaskData taskData, CancellationToken cancellationToken = default) => throw new T(); - public Task> CountTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - => throw new T(); - public Task> CountTasksAsync(Expression> filter, CancellationToken cancellationToken = default) => throw new T(); @@ -883,10 +879,6 @@ public Task DeleteTaskAsync(string id, CancellationToken cancellationToken = default) => throw new T(); - public IAsyncEnumerable ListTasksAsync(TaskFilter filter, - CancellationToken cancellationToken = default) - => throw new T(); - public Task<(IEnumerable tasks, long totalCount)> ListTasksAsync(Expression> filter, Expression> orderField, Expression> selector,