diff --git a/ArmoniK.Extensions.Csharp.sln b/ArmoniK.Extensions.Csharp.sln index d5242819..a67d6a8c 100644 --- a/ArmoniK.Extensions.Csharp.sln +++ b/ArmoniK.Extensions.Csharp.sln @@ -49,10 +49,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Symphony", "Symphony", "{E5 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DLLWorker", "DLLWorker", "{AB285F22-A32F-4C5C-A6B3-294E347BFFAE}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Symphony", "Symphony", "{2343D895-0821-4EBB-A56D-C58F817D5FF4}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Unified", "Unified", "{34DA3A29-FD3C-462B-BD35-38D699C4D901}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.EndToEndTests.Common", "Tests\ArmoniK.EndToEndTests\ArmoniK.EndToEndTests.Common\ArmoniK.EndToEndTests.Common.csproj", "{E7AE7482-42A7-4113-AB1E-EBECE53AF6CA}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.EndToEndTests.Client", "Tests\ArmoniK.EndToEndTests\ArmoniK.EndToEndTests.Client\ArmoniK.EndToEndTests.Client.csproj", "{7E5AE5BF-099E-4E00-B7CB-1C80FDC7C193}" @@ -61,6 +57,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.EndToEndTests.Worke EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.DevelopmentKit.Common.Tests", "Tests\ArmoniK.DevelopmentKit.Common.Tests\ArmoniK.DevelopmentKit.Common.Tests.csproj", "{84BB2691-33F0-45B4-8D63-0ECF82708CFC}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Common", "Common", "{129C2A61-56BD-4E29-9B25-8ADC98EC31CB}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "EndToEnd", "EndToEnd", "{7C63FF64-D798-4BD1-A461-BEE09B3D1BAC}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{B837CF75-270B-4354-9809-61360BB802FB}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.DevelopmentKit.Client.Common.Tests", "Client\tests\ArmoniK.DevelopmentKit.Client.Common.Tests\ArmoniK.DevelopmentKit.Client.Common.Tests.csproj", "{637ABAA8-2000-42F1-867A-B3C6531773D4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -115,6 +119,10 @@ Global {84BB2691-33F0-45B4-8D63-0ECF82708CFC}.Debug|Any CPU.Build.0 = Debug|Any CPU {84BB2691-33F0-45B4-8D63-0ECF82708CFC}.Release|Any CPU.ActiveCfg = Release|Any CPU {84BB2691-33F0-45B4-8D63-0ECF82708CFC}.Release|Any CPU.Build.0 = Release|Any CPU + {637ABAA8-2000-42F1-867A-B3C6531773D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {637ABAA8-2000-42F1-867A-B3C6531773D4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {637ABAA8-2000-42F1-867A-B3C6531773D4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {637ABAA8-2000-42F1-867A-B3C6531773D4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -138,12 +146,14 @@ Global {E6F98032-63BB-41CC-8E71-5E5DB54C3A87} = {9EFC0507-3071-4F7D-BFDD-6D99880D80C4} {E5766860-034C-4F83-907B-922205DD2E0E} = {9EFC0507-3071-4F7D-BFDD-6D99880D80C4} {AB285F22-A32F-4C5C-A6B3-294E347BFFAE} = {9EFC0507-3071-4F7D-BFDD-6D99880D80C4} - {2343D895-0821-4EBB-A56D-C58F817D5FF4} = {29622951-3654-41F0-9393-11D6737FD1F0} - {34DA3A29-FD3C-462B-BD35-38D699C4D901} = {29622951-3654-41F0-9393-11D6737FD1F0} - {E7AE7482-42A7-4113-AB1E-EBECE53AF6CA} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} - {7E5AE5BF-099E-4E00-B7CB-1C80FDC7C193} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} - {B960962F-4CB1-480D-8D10-9DE2990896B7} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} - {84BB2691-33F0-45B4-8D63-0ECF82708CFC} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} + {E7AE7482-42A7-4113-AB1E-EBECE53AF6CA} = {7C63FF64-D798-4BD1-A461-BEE09B3D1BAC} + {7E5AE5BF-099E-4E00-B7CB-1C80FDC7C193} = {7C63FF64-D798-4BD1-A461-BEE09B3D1BAC} + {B960962F-4CB1-480D-8D10-9DE2990896B7} = {7C63FF64-D798-4BD1-A461-BEE09B3D1BAC} + {84BB2691-33F0-45B4-8D63-0ECF82708CFC} = {129C2A61-56BD-4E29-9B25-8ADC98EC31CB} + {129C2A61-56BD-4E29-9B25-8ADC98EC31CB} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} + {7C63FF64-D798-4BD1-A461-BEE09B3D1BAC} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} + {B837CF75-270B-4354-9809-61360BB802FB} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} + {637ABAA8-2000-42F1-867A-B3C6531773D4} = {B837CF75-270B-4354-9809-61360BB802FB} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {1285A466-2AF6-43E6-8DCC-2F93A5D5F02E} diff --git a/ArmoniK.Extensions.Csharp.sln.DotSettings b/ArmoniK.Extensions.Csharp.sln.DotSettings index 14d4fcc0..3423df96 100644 --- a/ArmoniK.Extensions.Csharp.sln.DotSettings +++ b/ArmoniK.Extensions.Csharp.sln.DotSettings @@ -26,6 +26,7 @@ True True True + True True True True diff --git a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj index 63c053a2..54fac280 100644 --- a/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj +++ b/Client/src/Common/ArmoniK.DevelopmentKit.Client.Common.csproj @@ -1,7 +1,7 @@ - net472;net48;netstandard2.0;net5.0;net6.0 + net472;net48;netstandard2.0;net6.0 Library True true @@ -11,6 +11,8 @@ + + @@ -21,4 +23,8 @@ + + + + diff --git a/Client/src/Common/Exceptions/ServiceInvocationException.cs b/Client/src/Common/Exceptions/ServiceInvocationException.cs index aadb5185..3d29a229 100644 --- a/Client/src/Common/Exceptions/ServiceInvocationException.cs +++ b/Client/src/Common/Exceptions/ServiceInvocationException.cs @@ -36,12 +36,12 @@ public class ServiceInvocationException : Exception /// The default constructor /// /// The message to set for the exception - /// the statusCode in the output - public ServiceInvocationException(string message, - ArmonikStatusCode statusCode) + /// the taskStatusCode in the output + public ServiceInvocationException(string message, + ArmonikTaskStatusCode taskStatusCode) { - message_ = message; - StatusCode = statusCode; + message_ = message; + TaskStatusCode = taskStatusCode; } /// @@ -57,14 +57,14 @@ public ServiceInvocationException(Exception e) /// The overriden constructor to accept inner Exception as parameters /// /// The previous exception - /// The status of the task which is failing - public ServiceInvocationException(Exception e, - ArmonikStatusCode statusCode) + /// The status of the task which is failing + public ServiceInvocationException(Exception e, + ArmonikTaskStatusCode taskStatusCode) : base(e.Message, e) { - StatusCode = statusCode; - message_ = $"{message_} with InnerException {e.GetType()} message : {e.Message}"; + TaskStatusCode = taskStatusCode; + message_ = $"{message_} with InnerException {e.GetType()} message : {e.Message}"; } /// @@ -72,21 +72,21 @@ public ServiceInvocationException(Exception e, /// /// The message to set in the exception /// The previous exception generated by failure - /// The status of the task which is failing - public ServiceInvocationException(string message, - ArgumentException e, - ArmonikStatusCode statusCode) + /// The status of the task which is failing + public ServiceInvocationException(string message, + ArgumentException e, + ArmonikTaskStatusCode taskStatusCode) : base(message, e) { - message_ = message; - StatusCode = statusCode; + message_ = message; + TaskStatusCode = taskStatusCode; } /// /// The status code when error occurred /// - public ArmonikStatusCode StatusCode { get; } + public ArmonikTaskStatusCode TaskStatusCode { get; } /// /// The error details coming from TaskOutput API diff --git a/Client/src/Common/Status/ArmonikStatusCode.cs b/Client/src/Common/Status/ArmonikTaskStatusCode.cs similarity index 97% rename from Client/src/Common/Status/ArmonikStatusCode.cs rename to Client/src/Common/Status/ArmonikTaskStatusCode.cs index 2b45be1b..f68e29f8 100644 --- a/Client/src/Common/Status/ArmonikStatusCode.cs +++ b/Client/src/Common/Status/ArmonikTaskStatusCode.cs @@ -24,7 +24,7 @@ namespace ArmoniK.DevelopmentKit.Client.Common.Status; /// List of status for task and result in Armonik /// [PublicAPI] -public enum ArmonikStatusCode +public enum ArmonikTaskStatusCode { /// /// Unknown status of task or result @@ -34,7 +34,6 @@ public enum ArmonikStatusCode /// /// The task is completed but result could not be ready /// - [Obsolete("unused")] TaskCompleted, /// diff --git a/Client/src/Common/Status/ResultStatusData.cs b/Client/src/Common/Status/ResultStatusData.cs index 9bb80f6b..4c5cf565 100644 --- a/Client/src/Common/Status/ResultStatusData.cs +++ b/Client/src/Common/Status/ResultStatusData.cs @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -using ArmoniK.Api.gRPC.V1; +using ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; using JetBrains.Annotations; @@ -27,6 +27,6 @@ namespace ArmoniK.DevelopmentKit.Client.Common.Status; /// The id of the task producing the result /// The status of the result [PublicAPI] -public sealed record ResultStatusData(string ResultId, - string TaskId, - ResultStatus Status); +public sealed record ResultStatusData(string ResultId, + string TaskId, + ArmoniKResultStatus Status); diff --git a/Client/src/Common/Status/TaskStatusExt.cs b/Client/src/Common/Status/TaskStatusExt.cs index 0a74506e..06e063e8 100644 --- a/Client/src/Common/Status/TaskStatusExt.cs +++ b/Client/src/Common/Status/TaskStatusExt.cs @@ -28,20 +28,20 @@ public static class TaskStatusExt /// /// the native API status to convert /// the SDK status - public static ArmonikStatusCode ToArmonikStatusCode(this TaskStatus taskStatus) + public static ArmonikTaskStatusCode ToArmonikStatusCode(this TaskStatus taskStatus) => taskStatus switch { - TaskStatus.Submitted => ArmonikStatusCode.ResultNotReady, - TaskStatus.Timeout => ArmonikStatusCode.TaskTimeout, - TaskStatus.Cancelled => ArmonikStatusCode.TaskCancelled, - TaskStatus.Cancelling => ArmonikStatusCode.TaskCancelled, - TaskStatus.Error => ArmonikStatusCode.TaskFailed, - TaskStatus.Processing => ArmonikStatusCode.ResultNotReady, - TaskStatus.Dispatched => ArmonikStatusCode.ResultNotReady, - TaskStatus.Completed => ArmonikStatusCode.ResultNotReady, - TaskStatus.Creating => ArmonikStatusCode.ResultNotReady, - TaskStatus.Unspecified => ArmonikStatusCode.TaskFailed, - TaskStatus.Processed => ArmonikStatusCode.ResultReady, - _ => ArmonikStatusCode.Unknown, + TaskStatus.Submitted => ArmonikTaskStatusCode.ResultNotReady, + TaskStatus.Timeout => ArmonikTaskStatusCode.TaskTimeout, + TaskStatus.Cancelled => ArmonikTaskStatusCode.TaskCancelled, + TaskStatus.Cancelling => ArmonikTaskStatusCode.TaskCancelled, + TaskStatus.Error => ArmonikTaskStatusCode.TaskFailed, + TaskStatus.Processing => ArmonikTaskStatusCode.ResultNotReady, + TaskStatus.Dispatched => ArmonikTaskStatusCode.ResultNotReady, + TaskStatus.Completed => ArmonikTaskStatusCode.TaskCompleted, + TaskStatus.Creating => ArmonikTaskStatusCode.ResultNotReady, + TaskStatus.Unspecified => ArmonikTaskStatusCode.Unknown, + TaskStatus.Processed => ArmonikTaskStatusCode.ResultReady, + _ => ArmonikTaskStatusCode.Unknown, }; } diff --git a/Client/src/Common/Submitter/ApiExt/ArmoniKException.cs b/Client/src/Common/Submitter/ApiExt/ArmoniKException.cs new file mode 100644 index 00000000..5d146eca --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/ArmoniKException.cs @@ -0,0 +1,49 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Runtime.Serialization; + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +[PublicAPI] +public class ArmoniKException : ApplicationException +{ + /// + public ArmoniKException(string? message, + Exception? innerException) + : base(message, + innerException) + { + } + + /// + public ArmoniKException(string? message) + : base(message) + { + } + + /// + public ArmoniKException(SerializationInfo info, + StreamingContext context) + : base(info, + context) + { + } +} diff --git a/Client/src/Common/Submitter/ApiExt/ArmoniKResultStatus.cs b/Client/src/Common/Submitter/ApiExt/ArmoniKResultStatus.cs new file mode 100644 index 00000000..51192df8 --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/ArmoniKResultStatus.cs @@ -0,0 +1,31 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// Enum representing the status of a result +/// +[PublicAPI] +public enum ArmoniKResultStatus +{ + Unknown, + Ready, + NotReady, + Error, +} diff --git a/Client/src/Common/Submitter/ApiExt/ArmoniKResultStatusExt.cs b/Client/src/Common/Submitter/ApiExt/ArmoniKResultStatusExt.cs new file mode 100644 index 00000000..864332aa --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/ArmoniKResultStatusExt.cs @@ -0,0 +1,37 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; + +using ArmoniK.Api.gRPC.V1; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +internal static class ArmoniKResultStatusExt +{ + public static ArmoniKResultStatus ToArmoniKResultStatus(this ResultStatus resultStatus) + => resultStatus switch + { + ResultStatus.Unspecified => ArmoniKResultStatus.Unknown, + ResultStatus.Created => ArmoniKResultStatus.NotReady, + ResultStatus.Completed => ArmoniKResultStatus.Ready, + ResultStatus.Aborted => ArmoniKResultStatus.Error, + ResultStatus.Notfound => ArmoniKResultStatus.Error, + _ => throw new ArgumentOutOfRangeException(nameof(resultStatus), + resultStatus, + null), + }; +} diff --git a/Client/src/Common/Submitter/ApiExt/GrpcArmoniKClient.cs b/Client/src/Common/Submitter/ApiExt/GrpcArmoniKClient.cs new file mode 100644 index 00000000..38e83e81 --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/GrpcArmoniKClient.cs @@ -0,0 +1,420 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.Client; +using ArmoniK.Api.Client.Submitter; +using ArmoniK.Api.gRPC.V1; +using ArmoniK.Api.gRPC.V1.Results; +using ArmoniK.Api.gRPC.V1.Submitter; +using ArmoniK.Api.gRPC.V1.Tasks; +using ArmoniK.DevelopmentKit.Client.Common.Status; + +using JetBrains.Annotations; + +using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// Implements the ISubmitter by using the ArmoniK gRPC API. +/// +[PublicAPI] +// TODO: should be in ArmoniK.Api +public class GrpcArmoniKClient : IArmoniKClient +{ + private readonly Func channelFactory_; + + /// + /// Builds an instance of the armoniKClient + /// + /// Used to call the grpc API + public GrpcArmoniKClient(Func channelFactory) + => channelFactory_ = channelFactory; + + /// + public async Task> SubmitTasksAsync(string sessionId, + IEnumerable definitions, + TaskOptions taskOptions, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + try + { + var nbRequests = 0; + var taskRequests = definitions.Select(definition => + { + nbRequests++; + return definition.ToTaskRequest(); + }); + + using var guard = channelFactory_(); + + var service = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(guard.Channel); + + var response = await service.CreateTasksAsync(sessionId, + taskOptions, + taskRequests, + cancellationToken) + .ConfigureAwait(false); + + var output = response.ResponseCase switch + { + CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses, + // Maybe: use another kind of exception to ease the definition of a better retry policy? + CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks"), + CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with remote service"), + _ => throw new InvalidOperationException(), + }; + + + var nbTaskIds = 0; + foreach (var status in output) + { + cancellationToken.ThrowIfCancellationRequested(); + nbTaskIds++; + switch (status.StatusCase) + { + case CreateTaskReply.Types.CreationStatus.StatusOneofCase.TaskInfo: + break; + case CreateTaskReply.Types.CreationStatus.StatusOneofCase.Error: + throw new ArmoniKException(status.Error); + case CreateTaskReply.Types.CreationStatus.StatusOneofCase.None: + default: + throw new ArmoniKException($"TaskStatus error for a task in {nameof(SubmitTasksAsync)}"); + } + } + + if (nbRequests != nbTaskIds) + { + throw new ArmoniKException($"Number of taskId received ({nbTaskIds}) does not correspond to the number of tasks sent ({nbRequests}."); + } + + return output.Select(status => new TaskInfo(status.TaskInfo)); + } + catch (ArmoniKException) + { + throw; + } + catch (Exception e) + { + throw new ArmoniKException($"An unexpected error occurred: {e.Message}", + e); + } + } + + /// + public async Task> GetResultIdsAsync(IReadOnlyCollection taskIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Tasks.TasksClient(guard.Channel); + + var response = await service.GetResultIdsAsync(new GetResultIdsRequest + { + TaskId = + { + taskIds, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return response.TaskResults.Select(result => new TaskOutputIds(result)); + } + + /// + public async Task DownloadResultAsync(string sessionId, + string resultId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Results.ResultsClient(guard.Channel); + + return await service.DownloadResultData(sessionId, + resultId, + cancellationToken); + } + + /// + public async Task CreateSessionAsync(TaskOptions taskOptions, + IReadOnlyCollection partitions, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(guard.Channel); + + var session = await service.CreateSessionAsync(new CreateSessionRequest + { + DefaultTaskOption = taskOptions, + PartitionIds = + { + partitions, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return session.SessionId; + } + + /// + public async Task> GetTaskStatusAsync(IReadOnlyCollection taskIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(guard.Channel); + + var statuses = await service.GetTaskStatusAsync(new GetTaskStatusRequest + { + TaskIds = + { + taskIds, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return statuses.IdStatuses.Select(status => new TaskIdStatus(status.TaskId, + status.Status.ToArmonikStatusCode())); + } + + /// + public async Task> GetResultStatusAsync(string sessionId, + IReadOnlyCollection resultIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(guard.Channel); + + + // TODO: replace with a logic based on service.TryGetResultStream + var statuses = await service.GetResultStatusAsync(new GetResultStatusRequest + { + ResultIds = + { + resultIds, + }, + SessionId = sessionId, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return statuses.IdStatuses.Select(status => new ResultIdStatus(status.ResultId, + status.Status.ToArmoniKResultStatus())); + } + + /// + public async Task TryGetTaskErrorAsync(string sessionId, + string taskId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(guard.Channel); + + var output = await service.TryGetTaskOutputAsync(new TaskOutputRequest + { + Session = sessionId, + TaskId = taskId, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return output.TypeCase switch + { + Output.TypeOneofCase.Ok => null, + Output.TypeOneofCase.Error => output.Error.Details, + Output.TypeOneofCase.None => throw new InvalidOperationException(), + _ => throw new InvalidOperationException(), + }; + } + + /// + public async Task> WaitForCompletionAsync(string sessionId, + IReadOnlyCollection taskIds, + bool stopOnFirstTaskCancellation, + bool stopOnFirstTaskError, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(guard.Channel); + + var count = await service.WaitForCompletionAsync(new WaitRequest + { + Filter = new TaskFilter + { + Task = new TaskFilter.Types.IdsRequest + { + Ids = + { + taskIds, + }, + }, + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + }, + StopOnFirstTaskCancellation = stopOnFirstTaskCancellation, + StopOnFirstTaskError = stopOnFirstTaskError, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return count.Values.ToImmutableDictionary(statusCount => statusCount.Status, + statusCount => statusCount.Count); + } + + /// + public async Task WaitForAvailability(string sessionId, + string resultId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(guard.Channel); + + // TODO: replace with a logic based on service.TryGetResultStream + var reply = await service.WaitForAvailabilityAsync(new ResultRequest + { + ResultId = resultId, + Session = sessionId, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + switch (reply.TypeCase) + { + case AvailabilityReply.TypeOneofCase.Ok: + return; + case AvailabilityReply.TypeOneofCase.Error: + throw new ArmoniKException($"Result in Error - {resultId}\nMessage :\n{string.Join("Inner message:\n", reply.Error.Errors)}"); + case AvailabilityReply.TypeOneofCase.NotCompletedTask: + throw new ArmoniKException($"Result {resultId} was not yet completed"); + case AvailabilityReply.TypeOneofCase.None: + default: + throw new InvalidOperationException(); + } + } + + /// + public async Task>> CreateResultMetaDataAsync(string sessionId, + IReadOnlyCollection resultNames, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + { + ValidateRetryArguments(maxRetries, + totalTimeoutMs); + + using var guard = channelFactory_(); + + var service = new Results.ResultsClient(guard.Channel); + + + var resultsMetaData = await service.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest + { + SessionId = sessionId, + Results = + { + resultNames.Select(resultName => new CreateResultsMetaDataRequest.Types.ResultCreate + { + Name = resultName, + }), + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return resultsMetaData.Results.Select(result => new KeyValuePair(result.Name, + result.ResultId)); + } + + internal static void ValidateRetryArguments(int maxRetries, + double totalTimeoutMs) + { + // ReSharper disable once CompareOfFloatsByEqualityOperator + if (maxRetries != 1 || totalTimeoutMs != 1e10) + { + throw new ArgumentOutOfRangeException(nameof(maxRetries), + $""" + {nameof(GrpcArmoniKClient)} has no retry policy and only support a single call. + {maxRetries} must be equal to 1. + {totalTimeoutMs} must be equal to 1e10 ({1e10}). + """); + } + } +} diff --git a/Client/src/Common/Submitter/ApiExt/IArmoniKClient.cs b/Client/src/Common/Submitter/ApiExt/IArmoniKClient.cs new file mode 100644 index 00000000..075bee79 --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/IArmoniKClient.cs @@ -0,0 +1,150 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1; + +using JetBrains.Annotations; + +using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// Provides access to the ArmoniK Submitter Service +/// +[PublicAPI] +// TODO: should be in ArmoniK.Api +// TODO: Should be split in different services interfaces +public interface IArmoniKClient +{ + /// Default task options for the session + /// List of all partition that will be used during the session + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task CreateSessionAsync(TaskOptions taskOptions, + IReadOnlyCollection partitions, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the session + /// Definition of the tasks + /// Task options for these tasks + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task> SubmitTasksAsync(string sessionId, + IEnumerable definitions, + TaskOptions taskOptions, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the tasks + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task> GetResultIdsAsync(IReadOnlyCollection taskIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the session + /// Id of the result + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task DownloadResultAsync(string sessionId, + string resultId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the tasks + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task> GetTaskStatusAsync(IReadOnlyCollection taskIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the session + /// + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task> GetResultStatusAsync(string sessionId, + IReadOnlyCollection resultIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the session + /// Id of the task + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task TryGetTaskErrorAsync(string sessionId, + string taskId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the session + /// Id of the tasks + /// + /// + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task> WaitForCompletionAsync(string sessionId, + IReadOnlyCollection taskIds, + bool stopOnFirstTaskCancellation, + bool stopOnFirstTaskError, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + /// Id of the session + /// + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task WaitForAvailability(string sessionId, + string resultId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); + + + /// Id of the session + /// + /// Number of times the call must be retried. Default=1 + /// Define a timeout for the global call (including all retries) + /// + public Task>> CreateResultMetaDataAsync(string sessionId, + IReadOnlyCollection resultNames, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default); +} diff --git a/Client/src/Common/Submitter/ApiExt/ResultIdStatus.cs b/Client/src/Common/Submitter/ApiExt/ResultIdStatus.cs new file mode 100644 index 00000000..405a4b68 --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/ResultIdStatus.cs @@ -0,0 +1,39 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using ArmoniK.Api.gRPC.V1.Submitter; + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// +/// +/// +[PublicAPI] +public record ResultIdStatus(string ResultId, + ArmoniKResultStatus TaskStatus) +{ + /// + /// + /// + public ResultIdStatus(GetResultStatusReply.Types.IdStatus idStatus) + : this(idStatus.ResultId, + idStatus.Status.ToArmoniKResultStatus()) + { + } +} diff --git a/Client/src/Common/Submitter/ApiExt/RetryArmoniKClient.cs b/Client/src/Common/Submitter/ApiExt/RetryArmoniKClient.cs new file mode 100644 index 00000000..515d66ed --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/RetryArmoniKClient.cs @@ -0,0 +1,325 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1; + +using Grpc.Core; + +using JetBrains.Annotations; + +using Microsoft.Extensions.Logging; + +using Polly; +using Polly.Contrib.WaitAndRetry; + +using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// This ArmoniKClient uses a retry policy in case of failure from the execution of the call to the underlying +/// ArmoniKClient +/// +[PublicAPI] +// TODO: should be in ArmoniK.Api +public class RetryArmoniKClient : IArmoniKClient +{ + private readonly IArmoniKClient armoniKClient_; + private readonly ILogger logger_; + + /// + /// Default constructor + /// + /// + /// + public RetryArmoniKClient(ILogger logger, + IArmoniKClient armoniKClient) + { + logger_ = logger; + armoniKClient_ = armoniKClient; + } + + /// + public async Task> SubmitTasksAsync(string sessionId, + IEnumerable definitions, + TaskOptions taskOptions, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.SubmitTasksAsync(sessionId, + definitions.ToList(), + taskOptions, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task> GetResultIdsAsync(IReadOnlyCollection taskIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.GetResultIdsAsync(taskIds, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task DownloadResultAsync(string sessionId, + string resultId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.DownloadResultAsync(sessionId, + resultId, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task CreateSessionAsync(TaskOptions taskOptions, + IReadOnlyCollection partitions, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.CreateSessionAsync(taskOptions, + partitions, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task> GetTaskStatusAsync(IReadOnlyCollection taskIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.GetTaskStatusAsync(taskIds, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task> GetResultStatusAsync(string sessionId, + IReadOnlyCollection resultIds, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.GetResultStatusAsync(sessionId, + resultIds, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task TryGetTaskErrorAsync(string sessionId, + string taskId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.TryGetTaskErrorAsync(sessionId, + taskId, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task> WaitForCompletionAsync(string sessionId, + IReadOnlyCollection taskIds, + bool stopOnFirstTaskCancellation, + bool stopOnFirstTaskError, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.WaitForCompletionAsync(sessionId, + taskIds, + stopOnFirstTaskCancellation, + stopOnFirstTaskError, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task WaitForAvailability(string sessionId, + string resultId, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.WaitForAvailability(sessionId, + resultId, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + /// + public async Task>> CreateResultMetaDataAsync(string sessionId, + IReadOnlyCollection resultNames, + int maxRetries = 1, + double totalTimeoutMs = 1e10, + CancellationToken cancellationToken = default) + => await ApplyRetryPolicy(token => armoniKClient_.CreateResultMetaDataAsync(sessionId, + resultNames, + 1, + 1e10, + token), + maxRetries, + TimeSpan.FromMilliseconds(totalTimeoutMs), + cancellationToken); + + internal async Task ApplyRetryPolicy(Func> action, + int maxRetries, + TimeSpan totalTimeout, + CancellationToken cancellationToken, + Func? resultValidator = null, + Func? exceptionHandlePredicate = null, + [CallerMemberName] string callerName = "") + { + var delays = GetRetryDelays(maxRetries, + totalTimeout); + + var policyResult = await Policy.HandleInner(RpcExceptionPredicate) + .OrResult(resultValidator ?? (_ => false)) + .OrInner(exceptionHandlePredicate ?? (_ => false)) + .WaitAndRetryAsync(delays, + (result, + span, + context) => + { + // TODO: use the context to return a more complete exception at the end. + logger_.LogError(result.Exception, + "Error during execution of {method}, will retry in {time}ms", + context.OperationKey, + span.TotalMilliseconds); + }) + .ExecuteAndCaptureAsync((_, + token) => action(token), + new Context(callerName), + cancellationToken); + + if (policyResult.Outcome == OutcomeType.Failure) + { + throw new ArmoniKException($""" + Call to {callerName} failed the retry policy. + Reason is: {policyResult.FaultType}. + See previous log for details. + """, + policyResult.FinalException); + } + + return policyResult.Result; + } + + internal async Task ApplyRetryPolicy(Func action, + int maxRetries, + TimeSpan totalTimeout, + CancellationToken cancellationToken, + Func? exceptionHandlePredicate = null, + [CallerMemberName] string callerName = "") + => await ApplyRetryPolicy(async token => + { + await action(token); + return true; + }, + maxRetries, + totalTimeout, + cancellationToken, + null, + exceptionHandlePredicate, + callerName); + + private bool RpcExceptionPredicate(RpcException rpcException) + => rpcException.StatusCode switch + { + // TODO: review carefully to ensure that no false negative is here + StatusCode.OK => true, + StatusCode.Cancelled => true, + StatusCode.Unknown => true, + StatusCode.InvalidArgument => false, + StatusCode.DeadlineExceeded => true, + StatusCode.NotFound => false, + StatusCode.PermissionDenied => false, + StatusCode.Unauthenticated => false, + StatusCode.ResourceExhausted => true, + StatusCode.FailedPrecondition => true, + StatusCode.Aborted => true, + StatusCode.OutOfRange => true, + StatusCode.Unimplemented => false, + StatusCode.Internal => true, + StatusCode.Unavailable => true, + StatusCode.DataLoss => true, + StatusCode.AlreadyExists => false, + _ => throw new InvalidOperationException(), + }; + + private static IEnumerable GetRetryDelays(int maxRetries, + TimeSpan totalTimeout) + { + if (maxRetries <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxRetries), + "number of retries should be strictly positive"); + } + + if (totalTimeout.TotalMilliseconds <= 0) + { + throw new ArgumentOutOfRangeException(nameof(totalTimeout), + "totalTimeout should be strictly positive"); + } + + var medianFirstRetryDelay = TimeSpan.FromMilliseconds(totalTimeout.TotalMilliseconds / Enumerable.Range(2, + maxRetries + 1) + .Sum(i => i * i)); + + var delays = Backoff.DecorrelatedJitterBackoffV2(medianFirstRetryDelay, + maxRetries, + null, + true); + return delays; + } +} diff --git a/Client/src/Common/Submitter/ApiExt/TaskDefinition.cs b/Client/src/Common/Submitter/ApiExt/TaskDefinition.cs new file mode 100644 index 00000000..d2e3a8cd --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/TaskDefinition.cs @@ -0,0 +1,90 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; + +using ArmoniK.Api.gRPC.V1; + +using Google.Protobuf; + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// Contains the definition of an ArmoniK task +/// +/// The name of the payload +/// The task payload +/// The names of the other data required by the task. The task will not start without these data +/// The names of the data that should be produced by this task +[PublicAPI] +// TODO: should be in ArmoniK.Api +public record TaskDefinition(string PayloadName, + ByteString PayloadRawData, + IReadOnlyList Inputs, + IReadOnlyList Outputs) +{ + /// + /// + /// The name of the payload + /// The task payload + /// The names of the other data required by the task. The task will not start without these data + /// The names of the data that should be produced by this task + public TaskDefinition(string PayloadName, + ReadOnlyMemory Payload, + IReadOnlyList Inputs, + IReadOnlyList Outputs) + : this(PayloadName, + UnsafeByteOperations.UnsafeWrap(Payload), + Inputs, + Outputs) + { + } + + internal TaskDefinition(TaskRequest taskRequest) + : this(taskRequest.PayloadName, + taskRequest.Payload, + taskRequest.DataDependencies, + taskRequest.ExpectedOutputKeys) + { + } + + /// + /// Provide read-only access to the payload content. + /// + public ReadOnlyMemory Payload + => PayloadRawData.Memory; + + /// + /// Converts the current instance to a TaskRequest. + /// + /// + /// A TaskRequest instance with properties populated from the current instance. + /// + internal TaskRequest ToTaskRequest() + { + var output = new TaskRequest + { + Payload = PayloadRawData, + PayloadName = PayloadName, + }; + output.DataDependencies.Add(Inputs); + output.ExpectedOutputKeys.Add(Outputs); + return output; + } +} diff --git a/Client/src/Common/Submitter/ApiExt/TaskIdStatus.cs b/Client/src/Common/Submitter/ApiExt/TaskIdStatus.cs new file mode 100644 index 00000000..bf1654f3 --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/TaskIdStatus.cs @@ -0,0 +1,40 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using ArmoniK.Api.gRPC.V1.Submitter; +using ArmoniK.DevelopmentKit.Client.Common.Status; + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// +/// +/// +[PublicAPI] +public record TaskIdStatus(string TaskId, + ArmonikTaskStatusCode TaskStatus) +{ + /// + /// + /// + public TaskIdStatus(GetTaskStatusReply.Types.IdStatus idStatus) + : this(idStatus.TaskId, + idStatus.Status.ToArmonikStatusCode()) + { + } +} diff --git a/Client/src/Common/Submitter/ApiExt/TaskInfo.cs b/Client/src/Common/Submitter/ApiExt/TaskInfo.cs new file mode 100644 index 00000000..0278b9c3 --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/TaskInfo.cs @@ -0,0 +1,46 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Collections.Generic; + +using ArmoniK.Api.gRPC.V1.Submitter; + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +/// +/// Contains all the information about a submitted task. +/// +/// The Id of the task +/// The names of the other data required by the task. The task will not start without these data +/// The names of the data that should be produced by this task +/// The id of the payload +[PublicAPI] +// TODO: should be in ArmoniK.Api +public record TaskInfo(string TaskId, + IReadOnlyList Inputs, + IReadOnlyList Outputs, + string PayloadId) +{ + internal TaskInfo(CreateTaskReply.Types.TaskInfo taskInfo) + : this(taskInfo.TaskId, + taskInfo.DataDependencies, + taskInfo.ExpectedOutputKeys, + taskInfo.PayloadId) + { + } +} diff --git a/Client/src/Common/Submitter/ApiExt/TaskOutputIds.cs b/Client/src/Common/Submitter/ApiExt/TaskOutputIds.cs new file mode 100644 index 00000000..6ccb8a54 --- /dev/null +++ b/Client/src/Common/Submitter/ApiExt/TaskOutputIds.cs @@ -0,0 +1,34 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Collections.Generic; + +using ArmoniK.Api.gRPC.V1.Tasks; + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +[PublicAPI] +public record TaskOutputIds(string TaskId, + IReadOnlyCollection OutputIds) +{ + public TaskOutputIds(GetResultIdsResponse.Types.MapTaskResult mapTaskResult) + : this(mapTaskResult.TaskId, + mapTaskResult.ResultIds) + { + } +} diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 9d471e0e..239f044d 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -16,19 +16,16 @@ using System; using System.Collections.Generic; -using System.Data; +using System.Collections.Immutable; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.Client.Submitter; using ArmoniK.Api.Common.Utils; using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Results; -using ArmoniK.Api.gRPC.V1.Submitter; -using ArmoniK.Api.gRPC.V1.Tasks; using ArmoniK.DevelopmentKit.Client.Common.Status; +using ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; using ArmoniK.DevelopmentKit.Common; using ArmoniK.DevelopmentKit.Common.Exceptions; using ArmoniK.Utils; @@ -41,8 +38,6 @@ using Microsoft.Extensions.Logging; -using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; - namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; /// @@ -84,14 +79,26 @@ protected BaseClientSubmitter(Properties properties, properties_ = properties; Logger = loggerFactory.CreateLogger(); chunkSubmitSize_ = chunkSubmitSize; + + var channelPool = ClientServiceConnector.ControlPlaneConnectionPool(properties_, + LoggerFactory); + + var grpcArmoniKClient = new GrpcArmoniKClient(() => channelPool.GetChannel()); + + ArmoniKClient = new RetryArmoniKClient(loggerFactory.CreateLogger(), + grpcArmoniKClient); + SessionId = session ?? CreateSession(new[] { TaskOptions.PartitionId, }); + } private ILoggerFactory LoggerFactory { get; } + protected IArmoniKClient ArmoniKClient { get; } + /// /// Set or Get TaskOptions with inside MaxDuration, Priority, AppName, VersionName and AppNamespace /// @@ -116,25 +123,27 @@ public ChannelPool ChannelPool protected ILogger Logger { get; } - private Session CreateSession(IEnumerable partitionIds) + + /// + public override string ToString() + => SessionId.Id ?? "Session_Not_ready"; + + private Session CreateSession(IReadOnlyCollection partitionIds) { using var _ = Logger.LogFunction(); Logger.LogDebug("Creating Session... "); - var createSessionRequest = new CreateSessionRequest - { - DefaultTaskOption = TaskOptions, - PartitionIds = - { - partitionIds, - }, - }; - var session = ChannelPool.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CreateSession(createSessionRequest)); + + var session = ArmoniKClient.CreateSessionAsync(TaskOptions, + partitionIds, + 5, + cancellationToken: CancellationToken.None) + .Result; Logger.LogDebug("Session Created {SessionId}", SessionId); return new Session { - Id = session.SessionId, + Id = session, }; } @@ -144,40 +153,27 @@ private Session CreateSession(IEnumerable partitionIds) /// /// The taskId of the task /// - public TaskStatus GetTaskStatus(string taskId) - { - var status = GetTaskStatues(taskId) - .Single(); - return status.Item2; - } - - /// - /// Returns the list status of the tasks - /// - /// The list of taskIds - /// - public IEnumerable> GetTaskStatues(params string[] taskIds) - => ChannelPool.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).GetTaskStatus(new GetTaskStatusRequest - { - TaskIds = - { - taskIds, - }, - }) - .IdStatuses.Select(x => Tuple.Create(x.TaskId, - x.Status))); + public ArmonikTaskStatusCode GetTaskStatus(string taskId) + => ArmoniKClient.GetTaskStatusAsync(new[] + { + taskId, + }, + 5, + cancellationToken: CancellationToken.None) + .Result.Single() + .TaskStatus; /// /// Return the taskOutput when error occurred /// /// /// - public Output GetTaskOutputInfo(string taskId) - => ChannelPool.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).TryGetTaskOutput(new TaskOutputRequest - { - TaskId = taskId, - Session = SessionId.Id, - })); + public string? TryGetTaskError(string taskId) + => ArmoniKClient.TryGetTaskErrorAsync(SessionId.Id, + taskId, + 5, + cancellationToken: CancellationToken.None) + .Result; /// /// The method to submit several tasks with dependencies tasks. This task will wait for @@ -196,9 +192,9 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable payloadsWithDependencies.ToChunks(chunkSubmitSize_) - .SelectMany(chunk => ChunkSubmitTasksWithDependencies(chunk, - maxRetries, - taskOptions ?? TaskOptions)); + .SelectMany(chunk => SubmitTaskChunkWithDependencies(chunk, + maxRetries, + taskOptions ?? TaskOptions)); /// /// The method to submit several tasks with dependencies tasks. This task will wait for @@ -226,13 +222,13 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable Guid.NewGuid() .ToString())); - return ChunkSubmitTasksWithDependencies(chunk.Zip(resultsMetadata, - (payloadWithDependencies, - metadata) => Tuple.Create(metadata.Value, - payloadWithDependencies.Item1, - payloadWithDependencies.Item2)), - maxRetries, - taskOptions ?? TaskOptions); + return SubmitTaskChunkWithDependencies(chunk.Zip(resultsMetadata, + (payloadWithDependencies, + metadata) => Tuple.Create(metadata.Value, + payloadWithDependencies.Item1, + payloadWithDependencies.Item2)), + maxRetries, + taskOptions ?? TaskOptions); }); @@ -248,45 +244,32 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable /// return the ids of the created tasks [PublicAPI] - private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable>> payloadsWithDependencies, - int maxRetries, - TaskOptions? taskOptions = null) + private IEnumerable SubmitTaskChunkWithDependencies(IEnumerable>> payloadsWithDependencies, + int maxRetries, + TaskOptions? taskOptions = null) { using var _ = Logger.LogFunction(); - var taskRequests = payloadsWithDependencies.Select(pwd => - { - var taskRequest = new TaskRequest - { - Payload = UnsafeByteOperations.UnsafeWrap(pwd.Item2), - }; - taskRequest.DataDependencies.AddRange(pwd.Item3); - taskRequest.ExpectedOutputKeys.Add(pwd.Item1); - return taskRequest; - }); + var taskDefinitions = payloadsWithDependencies.Select(tuple => new TaskDefinition("", + UnsafeByteOperations.UnsafeWrap(tuple.Item2), + tuple.Item3.ToArray(), + new[] + { + tuple.Item1, + })); for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) { try { - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - var response = submitterService.CreateTasksAsync(SessionId.Id, - taskOptions ?? TaskOptions, - // multiple enumeration only occurs in case of failure - // ReSharper disable once PossibleMultipleEnumeration - taskRequests) - .ConfigureAwait(false) - .GetAwaiter() - .GetResult(); - return response.ResponseCase switch - { - CreateTaskReply.ResponseOneofCase.CreationStatusList => response.CreationStatusList.CreationStatuses.Select(status => status.TaskInfo.TaskId), - CreateTaskReply.ResponseOneofCase.None => throw new Exception("Issue with Server !"), - CreateTaskReply.ResponseOneofCase.Error => throw new Exception("Error while creating tasks !"), - _ => throw new InvalidOperationException(), - }; + return ArmoniKClient.SubmitTasksAsync(SessionId.Id, + // ReSharper disable once PossibleMultipleEnumeration + // Only occurs in case of retry + taskDefinitions, + taskOptions ?? TaskOptions, + 5, + cancellationToken: CancellationToken.None) + .Result.Select(info => info.TaskId); } catch (Exception e) { @@ -370,35 +353,25 @@ public void WaitForTasksCompletion(IEnumerable taskIds, { using var _ = Logger.LogFunction(); + // TODO: use RetryArmoniKClient instead of this code. Retry.WhileException(maxRetries, delayMs, retry => { - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - if (retry > 1) { Logger.LogWarning("Try {try} for {funcName}", retry, - nameof(submitterService.WaitForCompletion)); + nameof(ArmoniKClient.WaitForCompletionAsync)); } - var __ = submitterService.WaitForCompletion(new WaitRequest - { - Filter = new TaskFilter - { - Task = new TaskFilter.Types.IdsRequest - { - Ids = - { - taskIds, - }, - }, - }, - StopOnFirstTaskCancellation = true, - StopOnFirstTaskError = true, - }); + var __ = ArmoniKClient.WaitForCompletionAsync(SessionId.Id, + taskIds.ToList(), + true, + true, + 5, + cancellationToken: CancellationToken.None) + .Result; }, true, typeof(IOException), @@ -410,82 +383,59 @@ public void WaitForTasksCompletion(IEnumerable taskIds, /// /// Collection of task ids from which to retrieve results /// - /// A ResultCollection sorted by Status Completed, Result in Error or missing + /// A ResultCollection sorted by TaskStatus Completed, Result in Error or missing public ResultStatusCollection GetResultStatus(IEnumerable taskIds, CancellationToken cancellationToken = default) { var taskList = taskIds.ToList(); var mapTaskResults = GetResultIds(taskList); - var result2TaskDic = mapTaskResults.ToDictionary(result => result.ResultIds.Single(), + var result2TaskDic = mapTaskResults.ToDictionary(result => result.OutputIds.Single(), result => result.TaskId); - var missingTasks = taskList.Count > mapTaskResults.Count + var missingTasks = taskList.Count > result2TaskDic.Count ? taskList.Except(result2TaskDic.Values) .Select(tid => new ResultStatusData(string.Empty, tid, - ResultStatus.Notfound)) + ArmoniKResultStatus.Unknown)) : Array.Empty(); - var idStatus = Retry.WhileException(5, - 2000, - retry => - { - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - Logger.LogDebug("Try {try} for {funcName}", - retry, - nameof(submitterService.GetResultStatus)); - // TODO: replace with submitterService.TryGetResultStream() => Issue # - var resultStatusReply = submitterService.GetResultStatus(new GetResultStatusRequest - { - ResultIds = - { - result2TaskDic.Keys, - }, - SessionId = SessionId.Id, - }); - return resultStatusReply.IdStatuses; - }, - true, - typeof(IOException), - typeof(RpcException)); - - var idsResultError = new List(); - var idsReady = new List(); - var idsNotReady = new List(); - - foreach (var idStatusPair in idStatus) - { - var resData = new ResultStatusData(idStatusPair.ResultId, - result2TaskDic[idStatusPair.ResultId], - idStatusPair.Status); - - switch (idStatusPair.Status) - { - case ResultStatus.Notfound: - continue; - case ResultStatus.Completed: - idsReady.Add(resData); - break; - case ResultStatus.Created: - idsNotReady.Add(resData); - break; - case ResultStatus.Unspecified: - case ResultStatus.Aborted: - default: - idsResultError.Add(resData); - break; - } - - result2TaskDic.Remove(idStatusPair.ResultId); - } - - var resultStatusList = new ResultStatusCollection(idsReady, - idsResultError, + // TODO: use RetryArmoniKClient instead of this code. + var idStatuses = Retry.WhileException(5, + 2000, + retry => + { + Logger.LogDebug("Try {try} for {funcName}", + retry, + nameof(ArmoniKClient.GetResultStatusAsync)); + return ArmoniKClient.GetResultStatusAsync(SessionId.Id, + result2TaskDic.Keys, + 5, + cancellationToken: cancellationToken) + .Result; + }, + true, + typeof(IOException), + typeof(RpcException)) + .Where(status => status.TaskStatus != ArmoniKResultStatus.Unknown) + .ToLookup(idStatus => idStatus.TaskStatus, + idStatus => + { + var taskId = result2TaskDic[idStatus.ResultId]; + result2TaskDic.Remove(idStatus.ResultId); + return new ResultStatusData(idStatus.ResultId, + taskId, + idStatus.TaskStatus); + }); + + + var resultStatusList = new ResultStatusCollection(idStatuses[ArmoniKResultStatus.Ready] + .ToImmutableList(), + idStatuses[ArmoniKResultStatus.Error] + .ToImmutableList(), result2TaskDic.Values.ToList(), - idsNotReady, + idStatuses[ArmoniKResultStatus.NotReady] + .ToImmutableList(), missingTasks.ToList()); return resultStatusList; @@ -496,7 +446,7 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, /// /// The list of task ids. /// A collection of map task results. - public ICollection GetResultIds(IEnumerable taskIds) + public IEnumerable GetResultIds(IEnumerable taskIds) => Retry.WhileException(5, 2000, retry => @@ -508,14 +458,10 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, nameof(GetResultIds)); } - return ChannelPool.WithChannel(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest - { - TaskId = - { - taskIds, - }, - }) - .TaskResults); + return ArmoniKClient.GetResultIdsAsync(taskIds.ToList(), + 5, + cancellationToken: CancellationToken.None) + .Result; }, true, typeof(IOException), @@ -540,7 +486,7 @@ public byte[] GetResult(string taskId, taskId, }) .Single() - .ResultIds.Single(); + .OutputIds.Single(); var resultRequest = new ResultRequest @@ -553,31 +499,10 @@ public byte[] GetResult(string taskId, 2000, retry => { - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - Logger.LogDebug("Try {try} for {funcName}", - retry, - nameof(submitterService.WaitForAvailability)); - // TODO: replace with submitterService.TryGetResultStream() => Issue # - var availabilityReply = submitterService.WaitForAvailability(resultRequest, - cancellationToken: cancellationToken); - - switch (availabilityReply.TypeCase) - { - case AvailabilityReply.TypeOneofCase.None: - throw new Exception("Issue with Server !"); - case AvailabilityReply.TypeOneofCase.Ok: - break; - case AvailabilityReply.TypeOneofCase.Error: - throw new - ClientResultsException($"Result in Error - {resultId}\nMessage :\n{string.Join("Inner message:\n", availabilityReply.Error.Errors)}", - resultId); - case AvailabilityReply.TypeOneofCase.NotCompletedTask: - throw new DataException($"Result {resultId} was not yet completed"); - default: - throw new InvalidOperationException(); - } + ArmoniKClient.WaitForAvailability(SessionId.Id, + resultId, + cancellationToken: cancellationToken) + .Wait(cancellationToken); }, true, typeof(IOException), @@ -585,9 +510,10 @@ public byte[] GetResult(string taskId, return Retry.WhileException(5, 200, - _ => TryGetResultAsync(resultRequest, - cancellationToken) - .Result, + _ => ArmoniKClient.DownloadResultAsync(SessionId.Id, + resultId, + cancellationToken: cancellationToken) + .Result, true, typeof(IOException), typeof(RpcException))!; @@ -630,73 +556,12 @@ public IEnumerable> GetResults(IEnumerable taskIds /// /// // TODO: return a compound type to avoid having a nullable that holds the information and return an empty array. + [Obsolete] public async Task TryGetResultAsync(ResultRequest resultRequest, CancellationToken cancellationToken = default) - { - List> chunks; - int len; - - using var channel = ChannelPool.GetChannel(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - { - using var streamingCall = submitterService.TryGetResultStream(resultRequest, - cancellationToken: cancellationToken); - chunks = new List>(); - len = 0; - var isPayloadComplete = false; - - while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) - { - var reply = streamingCall.ResponseStream.Current; - - switch (reply.TypeCase) - { - case ResultReply.TypeOneofCase.Result: - if (!reply.Result.DataComplete) - { - chunks.Add(reply.Result.Data.Memory); - len += reply.Result.Data.Memory.Length; - // In case we receive a chunk after the data complete message (corrupt stream) - isPayloadComplete = false; - } - else - { - isPayloadComplete = true; - } - - break; - case ResultReply.TypeOneofCase.None: - return null; - - case ResultReply.TypeOneofCase.Error: - throw new Exception($"Error in task {reply.Error.TaskId} {string.Join("Message is : ", reply.Error.Errors.Select(x => x.Detail))}"); - - case ResultReply.TypeOneofCase.NotCompletedTask: - return null; - - default: - throw new InvalidOperationException("Got a reply with an unexpected message type."); - } - } - - if (!isPayloadComplete) - { - throw new ClientResultsException($"Result data is incomplete for id {resultRequest.ResultId}"); - } - } - - var res = new byte[len]; - var idx = 0; - foreach (var rm in chunks) - { - rm.CopyTo(res.AsMemory(idx, - rm.Length)); - idx += rm.Length; - } - - return res; - } + => await ArmoniKClient.DownloadResultAsync(resultRequest.Session, + resultRequest.ResultId, + cancellationToken: cancellationToken); /// /// Try to find the result of One task. If there no result, the function return byte[0] @@ -732,13 +597,7 @@ public IEnumerable> GetResults(IEnumerable taskIds taskId, }) .Single() - .ResultIds.Single(); - - var resultRequest = new ResultRequest - { - ResultId = resultId, - Session = SessionId.Id, - }; + .OutputIds.Single(); var resultReply = Retry.WhileException(5, 2000, @@ -753,9 +612,10 @@ public IEnumerable> GetResults(IEnumerable taskIds try { - var response = TryGetResultAsync(resultRequest, - cancellationToken) - .Result; + var response = ArmoniKClient.DownloadResultAsync(SessionId.Id, + resultId, + cancellationToken: cancellationToken) + .Result; return response; } catch (AggregateException ex) @@ -833,8 +693,7 @@ public IList> TryGetResults(IList resultIds) var taskIdInError = resultStatus.IdsError.Any() ? resultStatus.IdsError[0] - : resultStatus.IdsResultError[0] - .TaskId; + : resultStatus.IdsResultError[0].TaskId; const string message = "The missing result is in error or canceled. " + "Please check log for more information on Armonik grid server list of taskIds in Error: [{taskList}]\n" + @@ -853,12 +712,9 @@ public IList> TryGetResults(IList resultIds) return resultStatus.IdsReady.Select(resultStatusData => { - var res = TryGetResultAsync(new ResultRequest - { - ResultId = resultStatusData.ResultId, - Session = SessionId.Id, - }) - .Result; + var res = ArmoniKClient.DownloadResultAsync(SessionId.Id, + resultStatusData.ResultId) + .Result; return res == null ? null : new Tuple(resultStatusData.TaskId, @@ -875,18 +731,9 @@ public IList> TryGetResults(IList resultIds) /// Results names /// Dictionary where each result name is associated with its result id [PublicAPI] - public Dictionary CreateResultsMetadata(IEnumerable resultNames) - => ChannelPool.WithChannel(c => new Results.ResultsClient(c).CreateResultsMetaData(new CreateResultsMetaDataRequest - { - SessionId = SessionId.Id, - Results = - { - resultNames.Select(name => new CreateResultsMetaDataRequest.Types.ResultCreate - { - Name = name, - }), - }, - })) - .Results.ToDictionary(r => r.Name, - r => r.ResultId); + public IDictionary CreateResultsMetadata(IEnumerable resultNames) + => ArmoniKClient.CreateResultMetaDataAsync(SessionId.Id, + resultNames.ToList()) + .Result.ToImmutableDictionary(pair => pair.Key, + pair => pair.Value); } diff --git a/Client/src/Common/Submitter/ChannelPool.cs b/Client/src/Common/Submitter/ChannelPool.cs index 9981a83d..3255fdbd 100644 --- a/Client/src/Common/Submitter/ChannelPool.cs +++ b/Client/src/Common/Submitter/ChannelPool.cs @@ -165,7 +165,7 @@ public ChannelGuard GetChannel() public T WithChannel(Func f) { using var channel = GetChannel(); - return f(channel); + return f(channel.Channel); } /// @@ -173,11 +173,6 @@ public T WithChannel(Func f) /// public sealed class ChannelGuard : IDisposable { - /// - /// Channel that is used by nobody else - /// - private readonly ChannelBase channel_; - private readonly ChannelPool pool_; /// @@ -186,20 +181,17 @@ public sealed class ChannelGuard : IDisposable /// public ChannelGuard(ChannelPool channelPool) { - pool_ = channelPool; - channel_ = channelPool.AcquireChannel(); + pool_ = channelPool; + Channel = channelPool.AcquireChannel(); } - /// - public void Dispose() - => pool_.ReleaseChannel(channel_); - /// - /// Implicit convert a ChannelGuard into a ChannelBase + /// Channel that is used by nobody else /// - /// ChannelGuard - /// ChannelBase - public static implicit operator ChannelBase(ChannelGuard guard) - => guard.channel_; + public ChannelBase Channel { get; } + + /// + public void Dispose() + => pool_.ReleaseChannel(Channel); } } diff --git a/Client/src/Common/Submitter/ClientServiceConnector.cs b/Client/src/Common/Submitter/ClientServiceConnector.cs index bc5be60d..a2793e56 100644 --- a/Client/src/Common/Submitter/ClientServiceConnector.cs +++ b/Client/src/Common/Submitter/ClientServiceConnector.cs @@ -49,12 +49,13 @@ public static ChannelPool ControlPlaneConnectionPool(Properties properties, OverrideTargetName = properties.TargetNameOverride, }; + // ReSharper disable once InvertIf if (properties.ControlPlaneUri.Scheme == Uri.UriSchemeHttps && options.AllowUnsafeConnection && string.IsNullOrEmpty(options.OverrideTargetName)) { #if NET5_0_OR_GREATER var doOverride = !string.IsNullOrEmpty(options.CaCert); #else - var doOverride = true; + const bool doOverride = true; #endif if (doOverride) { diff --git a/Client/src/Common/Submitter/ISubmitterService.cs b/Client/src/Common/Submitter/ISubmitterService.cs index 03e4eaa7..bb994a55 100644 --- a/Client/src/Common/Submitter/ISubmitterService.cs +++ b/Client/src/Common/Submitter/ISubmitterService.cs @@ -14,9 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -30,7 +28,7 @@ namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; /// Common Interface for Submitter services. /// [PublicAPI] -public interface ISubmitterService : IDisposable +public interface ISubmitterService { /// /// The Id of the current session @@ -118,68 +116,3 @@ public Task SubmitAsync(string methodName, TaskOptions? taskOptions = null, CancellationToken token = default); } - -/// -/// Provide extension methods for ISubmitterService -/// -[PublicAPI] -public static class SubmitterServiceExt -{ - /// - /// The method submit will execute task asynchronously on the server - /// - /// the ISubmitterService extended - /// The name of the method inside the service - /// A list of objects that can be passed in parameters of the function - /// The handler callBack implemented as IServiceInvocationHandler to get response or result or error - /// The number of retry before fail to submit task. Default = 5 retries - /// - /// TaskOptions argument to override default taskOptions in Session. - /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker - /// - /// Return the taskId string - public static string Submit(this ISubmitterService service, - string methodName, - object[] arguments, - IServiceInvocationHandler handler, - int maxRetries = 5, - TaskOptions? taskOptions = null) - => service.Submit(methodName, - new[] - { - arguments, - }, - handler, - maxRetries, - taskOptions) - .Single(); - - /// - /// The method submit will execute task asynchronously on the server - /// - /// the ISubmitterService extended - /// The name of the method inside the service - /// List of serialized arguments that will already serialize for MethodName. - /// The handler callBack implemented as IServiceInvocationHandler to get response or result or error - /// The number of retry before fail to submit task. Default = 5 retries - /// - /// TaskOptions argument to override default taskOptions in Session. - /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker - /// - /// Return the taskId string - public static string Submit(this ISubmitterService service, - string methodName, - byte[] arguments, - IServiceInvocationHandler handler, - int maxRetries = 5, - TaskOptions? taskOptions = null) - => service.Submit(methodName, - new[] - { - arguments, - }, - handler, - maxRetries, - taskOptions) - .Single(); -} diff --git a/Client/src/Common/Submitter/SubmitterServiceExt.cs b/Client/src/Common/Submitter/SubmitterServiceExt.cs new file mode 100644 index 00000000..570ec17d --- /dev/null +++ b/Client/src/Common/Submitter/SubmitterServiceExt.cs @@ -0,0 +1,88 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Linq; + +using ArmoniK.Api.gRPC.V1; + +using JetBrains.Annotations; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; + +/// +/// Provide extension methods for ISubmitterService +/// +[PublicAPI] +public static class SubmitterServiceExt +{ + /// + /// The method submit will execute task asynchronously on the server + /// + /// the ISubmitterService extended + /// The name of the method inside the service + /// A list of objects that can be passed in parameters of the function + /// The handler callBack implemented as IServiceInvocationHandler to get response or result or error + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// Return the taskId string + public static string Submit(this ISubmitterService service, + string methodName, + object[] arguments, + IServiceInvocationHandler handler, + int maxRetries = 5, + TaskOptions? taskOptions = null) + => service.Submit(methodName, + new[] + { + arguments, + }, + handler, + maxRetries, + taskOptions) + .Single(); + + /// + /// The method submit will execute task asynchronously on the server + /// + /// the ISubmitterService extended + /// The name of the method inside the service + /// List of serialized arguments that will already serialize for MethodName. + /// The handler callBack implemented as IServiceInvocationHandler to get response or result or error + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// Return the taskId string + public static string Submit(this ISubmitterService service, + string methodName, + byte[] arguments, + IServiceInvocationHandler handler, + int maxRetries = 5, + TaskOptions? taskOptions = null) + => service.Submit(methodName, + new[] + { + arguments, + }, + handler, + maxRetries, + taskOptions) + .Single(); +} diff --git a/Client/src/Symphony/ArmoniK.DevelopmentKit.Client.Symphony.csproj b/Client/src/Symphony/ArmoniK.DevelopmentKit.Client.Symphony.csproj index 41bb58e2..ddd7503d 100644 --- a/Client/src/Symphony/ArmoniK.DevelopmentKit.Client.Symphony.csproj +++ b/Client/src/Symphony/ArmoniK.DevelopmentKit.Client.Symphony.csproj @@ -1,7 +1,7 @@ - net472;net48;net5.0;net6.0 + net472;net48;netstandard2.0;net6.0 Library True true @@ -10,7 +10,7 @@ - + @@ -18,10 +18,6 @@ - - - - diff --git a/Client/src/Symphony/ArmonikSymphonyClient.cs b/Client/src/Symphony/ArmonikSymphonyClient.cs index 38bdeeef..4303c1e6 100644 --- a/Client/src/Symphony/ArmonikSymphonyClient.cs +++ b/Client/src/Symphony/ArmonikSymphonyClient.cs @@ -16,9 +16,10 @@ using ArmoniK.Api.gRPC.V1; using ArmoniK.DevelopmentKit.Client.Common; -using ArmoniK.DevelopmentKit.Client.Common.Submitter; using ArmoniK.DevelopmentKit.Common; +using JetBrains.Annotations; + using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; @@ -33,12 +34,9 @@ namespace ArmoniK.DevelopmentKit.Client.Symphony; /// Samples.ArmoniK.Sample.SymphonyClient /// [MarkDownDoc] +[PublicAPI] public class ArmonikSymphonyClient { - private readonly IConfigurationSection controlPlanSection_; - private readonly ILogger Logger; - - /// /// The ctor with IConfiguration and optional TaskOptions /// @@ -48,12 +46,8 @@ public ArmonikSymphonyClient(IConfiguration configuration, ILoggerFactory loggerFactory) { Configuration = configuration; - controlPlanSection_ = configuration.GetSection(SectionGrpc) - .Exists() - ? configuration.GetSection(SectionGrpc) - : null; LoggerFactory = loggerFactory; - Logger = loggerFactory.CreateLogger(); + loggerFactory.CreateLogger(); } private ILoggerFactory LoggerFactory { get; } @@ -63,8 +57,6 @@ public ArmonikSymphonyClient(IConfiguration configuration, /// public string SectionGrpc { get; set; } = "Grpc"; - private ChannelPool GrpcPool { get; set; } - private IConfiguration Configuration { get; } @@ -75,11 +67,8 @@ public ArmonikSymphonyClient(IConfiguration configuration, /// Returns the SessionService to submit, wait or get result public SessionService CreateSession(TaskOptions? taskOptions = null) { - ControlPlaneConnection(); - - var properties = new Properties(Configuration, - taskOptions); + taskOptions ?? SessionService.InitializeDefaultTaskOptions()); return new SessionService(properties, LoggerFactory, @@ -95,28 +84,12 @@ public SessionService CreateSession(TaskOptions? taskOptions = null) public SessionService OpenSession(Session sessionId, TaskOptions? taskOptions = null) { - ControlPlaneConnection(); - var properties = new Properties(Configuration, - taskOptions); + taskOptions ?? SessionService.InitializeDefaultTaskOptions()); return new SessionService(properties, LoggerFactory, taskOptions ?? SessionService.InitializeDefaultTaskOptions(), sessionId); } - - private void ControlPlaneConnection() - { - if (GrpcPool != null) - { - return; - } - - var properties = new Properties(Configuration, - new TaskOptions()); - - GrpcPool = ClientServiceConnector.ControlPlaneConnectionPool(properties, - LoggerFactory); - } } diff --git a/Client/src/Symphony/SessionService.cs b/Client/src/Symphony/SessionService.cs index 4053b210..fd462d46 100644 --- a/Client/src/Symphony/SessionService.cs +++ b/Client/src/Symphony/SessionService.cs @@ -25,6 +25,8 @@ using Google.Protobuf.WellKnownTypes; +using JetBrains.Annotations; + using Microsoft.Extensions.Logging; namespace ArmoniK.DevelopmentKit.Client.Symphony; @@ -40,6 +42,7 @@ public class SessionService : BaseClientSubmitter /// Ctor to instantiate a new SessionService /// This is an object to send task or get Results from a session /// + [PublicAPI] public SessionService(Properties properties, ILoggerFactory loggerFactory, TaskOptions? taskOptions = null, @@ -51,17 +54,11 @@ public SessionService(Properties properties, { } - /// Returns a string that represents the current object. - /// A string that represents the current object. - public override string ToString() - => SessionId.Id ?? "Session_Not_ready"; /// /// Default task options /// /// - // TODO: mark with [PublicApi] ? - // ReSharper disable once MemberCanBePrivate.Global public static TaskOptions InitializeDefaultTaskOptions() => new() { diff --git a/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj b/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj index c66dce39..947e5e32 100644 --- a/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj +++ b/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj @@ -1,7 +1,7 @@ - net472;net48;net5.0;net6.0 + net472;net48;netstandard2.0;net6.0 Library True true diff --git a/Client/src/Unified/Services/SessionService.cs b/Client/src/Unified/Services/SessionService.cs index 14417e6c..fb19cb18 100644 --- a/Client/src/Unified/Services/SessionService.cs +++ b/Client/src/Unified/Services/SessionService.cs @@ -52,11 +52,6 @@ public SessionService(Properties properties, { } - - /// - public override string ToString() - => SessionId.Id ?? "Session_Not_ready"; - /// /// Supply a default TaskOptions /// @@ -77,7 +72,6 @@ public static TaskOptions InitializeDefaultTaskOptions() ApplicationName = "ArmoniK.DevelopmentKit.Worker.Unified", ApplicationVersion = "1.X.X", ApplicationNamespace = "ArmoniK.DevelopmentKit.Worker.Unified", - ApplicationService = "FallBackServerAdder", }; return taskOptions; diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index cde62bf8..9d505460 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -141,7 +141,7 @@ public Service(Properties properties, var ids = taskIds.ToList(); var mapTaskResults = SessionService.GetResultIds(ids); - var taskIdsResultIds = mapTaskResults.ToDictionary(result => result.ResultIds.Single(), + var taskIdsResultIds = mapTaskResults.ToDictionary(result => result.OutputIds.Single(), result => result.TaskId); @@ -505,18 +505,16 @@ private ServiceResult Execute(string methodName, var details = string.Empty; // ReSharper disable once InvertIf - if (status != TaskStatus.Completed) + if (status != ArmonikTaskStatusCode.TaskCompleted) { - var output = SessionService.GetTaskOutputInfo(taskId); - details = output.TypeCase == Output.TypeOneofCase.Error - ? output.Error.Details - : e.Message + e.StackTrace; + var output = SessionService.TryGetTaskError(taskId); + details = output ?? e.Message + e.StackTrace; } throw new ServiceInvocationException(e is AggregateException ? e.InnerException ?? e : e, - status.ToArmonikStatusCode()) + status) { OutputDetails = details, }; @@ -533,12 +531,12 @@ private ServiceResult Execute(string methodName, /// The action to take when a response is received. /// The action to take when an error occurs. /// The size of the chunk to retrieve results in. - private void ProxyTryGetResults(IEnumerable taskIds, - Action responseHandler, - Action errorHandler, - int chunkResultSize = 200) + private void ProxyTryGetResults(IEnumerable taskIds, + Action responseHandler, + Action errorHandler, + int chunkResultSize = 200) { - var missing = taskIds.ToHashSet(); + var missing = new HashSet(taskIds); var holdPrev = missing.Count; var waitInSeconds = new List { @@ -596,7 +594,7 @@ private void ProxyTryGetResults(IEnumerable taskIds, try { errorHandler(resultStatusData.TaskId, - TaskStatus.Error, + ArmonikTaskStatusCode.TaskFailed, e.Message + e.StackTrace); } catch (Exception e2) @@ -618,15 +616,12 @@ private void ProxyTryGetResults(IEnumerable taskIds, switch (taskStatus) { - case TaskStatus.Cancelling: - case TaskStatus.Cancelled: + case ArmonikTaskStatusCode.TaskCancelled: details = $"Task {resultStatusData.TaskId} was canceled"; break; default: - var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId); - details = outputInfo.TypeCase == Output.TypeOneofCase.Error - ? outputInfo.Error.Details - : "Result is in status : " + resultStatusData.Status + ", look for task in error in logs."; + var outputInfo = SessionService.TryGetTaskError(resultStatusData.TaskId); + details = outputInfo ?? "Result is in status : " + resultStatusData.Status + ", look for task in error in logs."; break; } @@ -656,7 +651,7 @@ private void ProxyTryGetResults(IEnumerable taskIds, try { errorHandler(resultStatusData.TaskId, - TaskStatus.Unspecified, + ArmonikTaskStatusCode.Unknown, "Task is missing"); } catch (Exception e) @@ -712,7 +707,7 @@ private void ResultTask() } catch (Exception e) { - const ArmonikStatusCode statusCode = ArmonikStatusCode.Unknown; + const ArmonikTaskStatusCode statusCode = ArmonikTaskStatusCode.Unknown; ServiceInvocationException ex; @@ -750,7 +745,7 @@ private void ResultTask() { try { - var statusCode = taskStatus.ToArmonikStatusCode(); + var statusCode = taskStatus; ResultHandlerDictionary[taskId] .HandleError(new ServiceInvocationException(ex, @@ -784,16 +779,6 @@ private void ResultTask() } } - - /// - /// Get a new channel to communicate with the control plane - /// - /// gRPC channel - // TODO: Refactor test to remove this - // ReSharper disable once UnusedMember.Global - public ChannelBase GetChannel() - => SessionService.ChannelPool.GetChannel(); - /// /// Class to return TaskId and the result /// diff --git a/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/ArmoniK.DevelopmentKit.Client.Common.Tests.csproj b/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/ArmoniK.DevelopmentKit.Client.Common.Tests.csproj new file mode 100644 index 00000000..3a232c06 --- /dev/null +++ b/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/ArmoniK.DevelopmentKit.Client.Common.Tests.csproj @@ -0,0 +1,29 @@ + + + + net472;net48;net5.0;net6.0;net7.0 + enable + + false + true + true + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + diff --git a/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/Directory.Build.props b/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/Directory.Build.props new file mode 100644 index 00000000..8f3d82c0 --- /dev/null +++ b/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/Directory.Build.props @@ -0,0 +1,11 @@ + + + + ../../../ + + + + + + + diff --git a/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/RetrySubmitterApplyPolicyTests.cs b/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/RetrySubmitterApplyPolicyTests.cs new file mode 100644 index 00000000..22e15ccc --- /dev/null +++ b/Client/tests/ArmoniK.DevelopmentKit.Client.Common.Tests/RetrySubmitterApplyPolicyTests.cs @@ -0,0 +1,400 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-$CURRENT_YEAR$. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt; + +using Grpc.Core; + +using Microsoft.Extensions.Logging.Abstractions; + +using Moq; + +using NUnit.Framework; + +namespace ArmoniK.DevelopmentKit.Client.Common.Tests; + +/// +/// Tests the RetryArmoniKClient class +/// +[TestFixture] +public class RetrySubmitterApplyPolicyTests +{ + /// + /// MaxRetries should be strictly positive + /// + [Test] + public void NegativeMaxRetriesTest() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + Assert.ThrowsAsync(() => retrySubmitterClient.ApplyRetryPolicy(_ => Task.CompletedTask, + -1, + TimeSpan.FromMinutes(1), + CancellationToken.None)); + } + + /// + /// MaxRetries should be strictly positive + /// + [Test] + public void ZeroMaxRetriesTest() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + Assert.ThrowsAsync(() => retrySubmitterClient.ApplyRetryPolicy(_ => Task.CompletedTask, + 0, + TimeSpan.FromMinutes(1), + CancellationToken.None)); + } + + /// + /// Timeout should be strictly positive + /// + [Test] + public void NegativeTimeoutTest() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + Assert.ThrowsAsync(() => retrySubmitterClient.ApplyRetryPolicy(_ => Task.CompletedTask, + 1, + TimeSpan.FromMinutes(-1), + CancellationToken.None)); + } + + /// + /// Timeout should be strictly positive + /// + [Test] + public void ZeroTimeoutTest() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + Assert.ThrowsAsync(() => retrySubmitterClient.ApplyRetryPolicy(_ => Task.CompletedTask, + 1, + TimeSpan.FromMinutes(0), + CancellationToken.None)); + } + + /// + /// Happy flow scenario + /// + [Test] + public async Task ShouldSendResultIfNoError() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var source = new object(); + + var result = await retrySubmitterClient.ApplyRetryPolicy(_ => Task.FromResult(source), + 1, + TimeSpan.FromSeconds(1), + CancellationToken.None); + + Assert.That(result, + Is.Not.Null); + Assert.That(result, + Is.SameAs(source)); + } + + /// + /// First call throws an RpcException but is handled by the policy and the proper result is returned. + /// + [Test] + public async Task ShouldSendResultIfOneRpcException() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var source = new object(); + var calls = 0; + + var result = await retrySubmitterClient.ApplyRetryPolicy(_ => + { + calls++; + if (calls == 1) + { + throw new RpcException(Grpc.Core.Status.DefaultCancelled); + } + + return Task.FromResult(source); + }, + 5, + TimeSpan.FromSeconds(1), + CancellationToken.None); + Assert.Multiple(() => + { + Assert.That(result, + Is.Not.Null); + Assert.That(result, + Is.SameAs(source)); + Assert.That(calls, + Is.EqualTo(2)); + }); + } + + /// + /// First call throws an RpcException but is handled by the policy and the proper result is returned. + /// + [Test] + public async Task ShouldSendResultIfOneInnerRpcException() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var source = new object(); + var calls = 0; + + var result = await retrySubmitterClient.ApplyRetryPolicy(_ => + { + calls++; + if (calls == 1) + { + throw new Exception("", + new RpcException(Grpc.Core.Status.DefaultCancelled)); + } + + return Task.FromResult(source); + }, + 5, + TimeSpan.FromSeconds(1), + CancellationToken.None); + Assert.Multiple(() => + { + Assert.That(result, + Is.Not.Null); + Assert.That(result, + Is.SameAs(source)); + Assert.That(calls, + Is.EqualTo(2)); + }); + } + + /// + /// First call throws an IOException but is handled by the policy and the proper result is returned. + /// + [Test] + public void ShouldThrowIfOneIOException() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var source = new object(); + var calls = 0; + + Assert.ThrowsAsync(() => retrySubmitterClient.ApplyRetryPolicy(_ => + { + calls++; + if (calls == 1) + { + throw new IOException(); + } + + return Task.FromResult(source); + }, + 5, + TimeSpan.FromSeconds(1), + CancellationToken.None)); + + Assert.Multiple(() => + { + Assert.That(calls, + Is.EqualTo(1)); + }); + } + + /// + /// First call throws an IOException but is handled by the policy and the proper result is returned. + /// + [Test] + public async Task ShouldSendResultIfOneIOException() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var source = new object(); + var calls = 0; + + var result = await retrySubmitterClient.ApplyRetryPolicy(_ => + { + calls++; + if (calls == 1) + { + throw new IOException(); + } + + return Task.FromResult(source); + }, + 5, + TimeSpan.FromSeconds(1), + CancellationToken.None, + null, + exception => exception is IOException); + Assert.Multiple(() => + { + Assert.That(result, + Is.Not.Null); + Assert.That(result, + Is.SameAs(source)); + Assert.That(calls, + Is.EqualTo(2)); + }); + } + + /// + /// First call throws an IOException but is handled by the policy and the proper result is returned. + /// + [Test] + public void ShouldThrowIfOneInnerIOException() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var source = new object(); + var calls = 0; + + Assert.ThrowsAsync(() => retrySubmitterClient.ApplyRetryPolicy(_ => + { + calls++; + if (calls == 1) + { + throw new Exception("", + new IOException()); + } + + return Task.FromResult(source); + }, + 5, + TimeSpan.FromSeconds(1), + CancellationToken.None)); + + Assert.Multiple(() => + { + Assert.That(calls, + Is.EqualTo(1)); + }); + } + + /// + /// First call throws an IOException but is handled by the policy and the proper result is returned. + /// + [Test] + public async Task ShouldSendResultIfOneInnerIOException() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var source = new object(); + var calls = 0; + + var result = await retrySubmitterClient.ApplyRetryPolicy(_ => + { + calls++; + if (calls == 1) + { + throw new Exception("", + new IOException()); + } + + return Task.FromResult(source); + }, + 5, + TimeSpan.FromSeconds(1), + CancellationToken.None, + null, + exception => exception is IOException); + Assert.Multiple(() => + { + Assert.That(result, + Is.Not.Null); + Assert.That(result, + Is.SameAs(source)); + Assert.That(calls, + Is.EqualTo(2)); + }); + } + + /// + /// First call throws an IOException but is handled by the policy and the proper result is returned. + /// + [Test] + public void ShouldThrowAfterSeveralMaxRetries() + { + var logger = NullLogger.Instance; + var submitterClient = new Mock().Object; + + var retrySubmitterClient = new RetryArmoniKClient(logger, + submitterClient); + + var calls = 0; + + Assert.ThrowsAsync(() => retrySubmitterClient.ApplyRetryPolicy>(_ => + { + calls++; + throw new Exception("", + new RpcException(Grpc.Core.Status + .DefaultSuccess)); + }, + 5, + TimeSpan.FromMilliseconds(10), + CancellationToken.None)); + Assert.That(calls, + Is.EqualTo(6)); // 1 first attempt + 5 retries + } +} diff --git a/Common/src/Common/AppsOptions.cs b/Common/src/Common/AppsOptions.cs index 5d12b947..217df221 100644 --- a/Common/src/Common/AppsOptions.cs +++ b/Common/src/Common/AppsOptions.cs @@ -14,7 +14,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#pragma warning disable CS1591 namespace ArmoniK.DevelopmentKit.Common; [MarkDownDoc] diff --git a/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj b/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj index 366dfff9..b9a1476a 100644 --- a/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj +++ b/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj @@ -1,7 +1,7 @@ - net472;net48;netstandard2.0;net5.0;net6.0 + net472;net48;netstandard2.0;net6.0 Library True true @@ -10,7 +10,7 @@ - + diff --git a/Common/src/Common/EngineType.cs b/Common/src/Common/EngineType.cs index ff4febac..a0ed6c2a 100644 --- a/Common/src/Common/EngineType.cs +++ b/Common/src/Common/EngineType.cs @@ -77,9 +77,10 @@ public string this[EngineType key] { get { - if (engineTypes_.ContainsKey(key)) + if (engineTypes_.TryGetValue(key, + out var item)) { - return engineTypes_[key]; + return item; } throw new KeyNotFoundException($"There is no engine type [{key}]"); diff --git a/Common/src/Common/Extensions/IEnumerableExt.cs b/Common/src/Common/Extensions/IEnumerableExt.cs index 10df2c4c..79ffd6df 100644 --- a/Common/src/Common/Extensions/IEnumerableExt.cs +++ b/Common/src/Common/Extensions/IEnumerableExt.cs @@ -23,7 +23,7 @@ namespace ArmoniK.DevelopmentKit.Common.Extensions; /// /// Convert IEnumerable byte to IEnumerable double /// -public static class IEnumerableExt +public static class EnumerableExt { /// /// Convert IEnumerable byte to IEnumerable double diff --git a/Common/src/Common/MarkDownDocAttribute.cs b/Common/src/Common/MarkDownDocAttribute.cs index a93d81a3..d57abac6 100644 --- a/Common/src/Common/MarkDownDocAttribute.cs +++ b/Common/src/Common/MarkDownDocAttribute.cs @@ -1,4 +1,4 @@ -// This file is part of the ArmoniK project +// This file is part of the ArmoniK project // // Copyright (C) ANEO, 2021-2023.All rights reserved. // diff --git a/Common/src/Common/Utils/Either.cs b/Common/src/Common/Utils/Either.cs index ea33db48..45b1d723 100644 --- a/Common/src/Common/Utils/Either.cs +++ b/Common/src/Common/Utils/Either.cs @@ -24,19 +24,20 @@ namespace ArmoniK.DevelopmentKit.Common.Utils; /// /// Represents a simple Either type to manage an object of type L or an exception of type R. /// -/// The object in the Either. -/// The exception in the Either. -public class Either +/// The object in the Either. +/// The exception in the Either. +public class Either + where TException : Exception { /// /// The exception in the Either. /// - private readonly R exception_; + private readonly TException? exception_; /// /// The object in the Either. /// - private readonly L obj_; + private readonly TL? obj_; /// /// The status of the Either. @@ -56,7 +57,7 @@ public Either() /// Constructs an Either with an object. /// /// The object to be stored in the Either. - public Either(L obj) + public Either(TL obj) { obj_ = obj; exception_ = default; @@ -67,7 +68,7 @@ public Either(L obj) /// Constructs an Either with an exception. /// /// The exception to be stored in the Either. - public Either(R exception) + public Either(TException exception) { exception_ = exception; obj_ = default; @@ -79,23 +80,23 @@ public Either(R exception) /// /// The Either to be converted. /// The exception stored in the Either. - public static explicit operator R(Either ma) - => ma.exception_; + public static explicit operator TException(Either ma) + => ma.exception_!; /// /// Implicitly converts the Either to an object. /// /// The Either to be converted. /// The object stored in the Either. - public static explicit operator L(Either ma) - => ma.obj_; + public static explicit operator TL(Either ma) + => ma.obj_ ?? throw ma.exception_!; /// /// Implicitly converts an object to an Either. /// /// The object to be converted. /// An Either containing the object. - public static implicit operator Either(L ma) + public static implicit operator Either(TL ma) => new(ma); /// @@ -103,7 +104,7 @@ public static implicit operator Either(L ma) /// /// The exception to be converted. /// An Either containing the exception. - public static implicit operator Either(R ma) + public static implicit operator Either(TException ma) => new(ma); /// @@ -111,15 +112,15 @@ public static implicit operator Either(R ma) /// /// The action to be executed. /// The object stored in the Either. - public L IfRight(Action action) + public TL? IfRight(Action action) { if (status_ == EitherStatus.Right) { - action(exception_); + action(exception_!); } else { - return obj_; + return obj_!; } return default; diff --git a/Tests/ArmoniK.DevelopmentKit.Common.Tests/ArmoniK.DevelopmentKit.Common.Tests.csproj b/Tests/ArmoniK.DevelopmentKit.Common.Tests/ArmoniK.DevelopmentKit.Common.Tests.csproj index 04fcc949..359a0e5c 100644 --- a/Tests/ArmoniK.DevelopmentKit.Common.Tests/ArmoniK.DevelopmentKit.Common.Tests.csproj +++ b/Tests/ArmoniK.DevelopmentKit.Common.Tests/ArmoniK.DevelopmentKit.Common.Tests.csproj @@ -11,7 +11,7 @@ - + diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/ArmoniK.EndToEndTests.Client.csproj b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/ArmoniK.EndToEndTests.Client.csproj index 6fc00827..15aab390 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/ArmoniK.EndToEndTests.Client.csproj +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/ArmoniK.EndToEndTests.Client.csproj @@ -27,7 +27,7 @@ - + diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Program.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Program.cs index 77d8b384..c1c2be8e 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Program.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Program.cs @@ -27,6 +27,7 @@ using Serilog; using Serilog.Extensions.Logging; +using Serilog.Sinks.SystemConsole.Themes; namespace ArmoniK.EndToEndTests.Client; @@ -39,6 +40,7 @@ public class Program private static void Main(string[] args) { Console.WriteLine("Hello Armonik End to End Tests !"); + Console.WriteLine($"These tests require {typeof(AnsiConsoleTheme).Assembly.FullName}"); var builder = new ConfigurationBuilder().SetBasePath(Directory.GetCurrentDirectory()) diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs index 1d0b7e4a..e37700a4 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs @@ -59,10 +59,10 @@ public class AggregationPriorityTest /// /// numbers_ is an array of double values. /// - private readonly double[] numbers_ = Enumerable.Range(0, - 10) - .Select(i => (double)i) - .ToArray(); + private readonly double[] numbers_ = System.Linq.Enumerable.Range(0, + 10) + .Select(i => (double)i) + .ToArray(); /// /// unifiedTestHelper_ is an instance of UnifiedTestHelper class. @@ -156,7 +156,7 @@ private async Task> GetDistribution(int nRows) var taskRawData = new List(); - await foreach (var taskRaw in RetrieveAllTasksStats(service.GetChannel(), + await foreach (var taskRaw in RetrieveAllTasksStats(null, new Filters { Or = @@ -248,7 +248,7 @@ private async Task> GetDistribution(int nRows) /// /// The sessionId to retrieve the results from. /// The taskIds to retrieve the results for. - /// A of the results. + /// A IEnumerable{Tuple{string, byte[]}} of the results. private IEnumerable> WaitForResults(string sessionId, IEnumerable taskIds) { @@ -276,7 +276,7 @@ private IEnumerable> WaitForResults(string se //TODO Fix issue GetResultIds return MapTaskResult can be N result for N TaskId since parentTaskIds can be requested var mapTaskResults = taskList.ToChunks(200) .SelectMany(b => service.SessionService.GetResultIds(b)) - .Select(mp => (mp.ResultIds, mp.TaskId)) + .Select(mp => (mp.OutputIds, mp.TaskId)) .ToList(); var dic = mapTaskResults.GroupBy(taskResult => taskResult.Item1.First()) .ToDictionary(group => group.Key, @@ -354,7 +354,7 @@ private IEnumerable> WaitForResults(string se public void Check_That_Result_has_expected_value(int squareMatrixSize) { unifiedTestHelper_.Log.LogInformation($"Compute square matrix with n = {squareMatrixSize}"); - unifiedTestHelper_.Log.LogInformation($"Duplicating {squareMatrixSize} Rows with vector {string.Join(", ", Enumerable.Range(0, squareMatrixSize))}"); + unifiedTestHelper_.Log.LogInformation($"Duplicating {squareMatrixSize} Rows with vector {string.Join(", ", System.Linq.Enumerable.Range(0, squareMatrixSize))}"); var taskId = unifiedTestHelper_.Service.Submit("ComputeMatrix", UnitTestHelperBase.ParamsHelper(squareMatrixSize), @@ -373,16 +373,20 @@ public void Check_That_Result_has_expected_value(int squareMatrixSize) var taskResult = TaskResult.Deserialize(deprot[0] as byte[]); unifiedTestHelper_.Log.LogInformation($"Result of Matrix formula : {taskResult.Result}"); - var sum = Enumerable.Range(0, - squareMatrixSize) - .Aggregate(0.0, - (current, - scalar) => current + scalar * scalar); + var sum = System.Linq.Enumerable.Range(0, + squareMatrixSize) + .Aggregate(0.0, + (current, + scalar) => current + scalar * scalar); Assert.That(sum * squareMatrixSize, Is.EqualTo(taskResult.Result)); - var _ = GetDistribution(squareMatrixSize) - .Result; + if (false) + // ReSharper disable once HeuristicUnreachableCode + { + var _ = GetDistribution(squareMatrixSize) + .Result; + } } } diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClient.cs index 6221c579..96f08d51 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClient.cs @@ -49,10 +49,10 @@ public override void EntryPoint() var client = new ArmonikSymphonyClient(Configuration, LoggerFactory); Log.LogInformation("------ Start 2 Sessions with 100 tasks -------"); - var payloadsTasks = Enumerable.Range(1, - 2) - .Select(idx => new Task(() => ClientStartup(client, - idx))); + var payloadsTasks = System.Linq.Enumerable.Range(1, + 2) + .Select(idx => new Task(() => ClientStartup(client, + idx))); var tasks = payloadsTasks.ToList(); tasks.AsParallel() .ForAll(t => t.Start()); @@ -135,9 +135,9 @@ private void ClientStartup(ArmonikSymphonyClient client, var sessionService = client.CreateSession(taskOptions); - var payloads = Enumerable.Repeat(0, - 100) - .Select(_ => clientPayload.Serialize()); + var payloads = System.Linq.Enumerable.Repeat(0, + 100) + .Select(_ => clientPayload.Serialize()); var taskIds = sessionService.SubmitTasks(payloads); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClientTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClientTest.cs index 3a49f19f..7172e7fd 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClientTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckRandomException/RandomExceptionSymClientTest.cs @@ -50,10 +50,10 @@ public void Check_That_Exceptions_In_A_Session_Does_Not_Affect_The_Other_Session Type = ClientPayload.TaskType.Expm1, }.Serialize(); - var payloadsTasks = Enumerable.Range(1, - 2) - .Select(_ => new Task(() => SendTaskAndGetErrorCount(clientPayload))) - .ToArray(); + var payloadsTasks = System.Linq.Enumerable.Range(1, + 2) + .Select(_ => new Task(() => SendTaskAndGetErrorCount(clientPayload))) + .ToArray(); payloadsTasks.AsParallel() .ForAll(t => t.Start()); @@ -72,9 +72,9 @@ public int SendTaskAndGetErrorCount(byte[] clientPayload) var symphonyTestHelper = new SymphonyTestHelper(ApplicationNamespace, ApplicationService); - var payloads = Enumerable.Repeat(0, - 50) - .Select(_ => clientPayload); + var payloads = System.Linq.Enumerable.Repeat(0, + 50) + .Select(_ => clientPayload); var taskIds = symphonyTestHelper.SessionService.SubmitTasks(payloads); //var taskResults = symphonyTestHelper.WaitForTasksResult(0, taskIds).ToList(); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClient.cs index 486167a2..2e6bc117 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClient.cs @@ -97,8 +97,8 @@ public override void EntryPoint() Configuration.GetSection("Grpc")["EndPoint"], 5001); - using var cs = ServiceFactory.CreateService(props, - LoggerFactory); + var cs = ServiceFactory.CreateService(props, + LoggerFactory); Log.LogInformation("Running End to End test to compute Sum of numbers with subtasking"); SumNumbersWithSubtasking(cs); @@ -121,9 +121,9 @@ private void SumNumbersWithSubtasking(ISubmitterService sessionService, int subtaskSplitCount = 2) { Log.LogInformation($"Launching Sum of numbers 1 to {maxNumberToSum}"); - var numbers = Enumerable.Range(1, - maxNumberToSum) - .ToList(); + var numbers = System.Linq.Enumerable.Range(1, + maxNumberToSum) + .ToList(); var payload = new ClientPayload { IsRootTask = true, diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClientTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClientTest.cs index b6e73e9b..f100df4a 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClientTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTreeUnifiedApi/SubtaskingTreeUnifiedApiClientTest.cs @@ -56,9 +56,9 @@ public void Check_That_Subtasking_Is_Working_With_Unified_SDK(int maxNumberToSum int subtaskSplitCount) { unifiedTestHelper_.Log.LogInformation($"Launching Sum of numbers 1 to {maxNumberToSum}"); - var numbers = Enumerable.Range(1, - maxNumberToSum) - .ToList(); + var numbers = System.Linq.Enumerable.Range(1, + maxNumberToSum) + .ToList(); var payload = new ClientPayload { IsRootTask = true, diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClient.cs index 102e9fd1..7ef6b4dd 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClient.cs @@ -74,9 +74,9 @@ private static void ExecuteTreeSubtasking(SessionService sessionService, int maxNumberToSum = 10, int subtaskSplitCount = 2) { - var numbers = Enumerable.Range(1, - maxNumberToSum) - .ToList(); + var numbers = System.Linq.Enumerable.Range(1, + maxNumberToSum) + .ToList(); var payload = new ClientPayload { IsRootTask = true, diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClientTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClientTest.cs index 1eb027e0..99a19fb8 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClientTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckSubtaskingTree_SymphonySDK/SubtaskingTreeClientTest.cs @@ -53,9 +53,9 @@ public void Cleanup() public void Check_That_Subtasking_Is_Working_With_Symphony_SDK(int maxNumberToSum, int subtaskSplitCount) { - var numbers = Enumerable.Range(1, - maxNumberToSum) - .ToList(); + var numbers = System.Linq.Enumerable.Range(1, + maxNumberToSum) + .ToList(); var expectedResult = numbers.Sum(elem => (long)elem); var payload = new ClientPayload diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClient.cs index 6772d137..525be569 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClient.cs @@ -48,10 +48,10 @@ public override void EntryPoint() var client = new ArmonikSymphonyClient(Configuration, LoggerFactory); Log.LogInformation("------ Start 2 Sessions with 100 tasks -------"); - var payloadsTasks = Enumerable.Range(1, - 2) - .Select(idx => new Task(() => ClientStartup(client, - idx))); + var payloadsTasks = System.Linq.Enumerable.Range(1, + 2) + .Select(idx => new Task(() => ClientStartup(client, + idx))); var tasks = payloadsTasks.ToList(); tasks.AsParallel() .ForAll(t => t.Start()); @@ -122,9 +122,9 @@ private void ClientStartup(ArmonikSymphonyClient client, var sessionService = client.CreateSession(taskOptions); - var payloads = Enumerable.Repeat(0, - 100) - .Select(_ => clientPayload.Serialize()); + var payloads = System.Linq.Enumerable.Repeat(0, + 100) + .Select(_ => clientPayload.Serialize()); var taskIds = sessionService.SubmitTasks(payloads); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClientTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClientTest.cs index 1f8c2697..919d7ff5 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClientTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckTryGetResults/CheckTryGetResultClientTest.cs @@ -46,19 +46,19 @@ public void Cleanup() [Test] public void Check_That_We_Can_Launch_tasks_In_Multiple_Sessions_At_The_Same_Time() { - var numbersToSquareReduce = Enumerable.Range(1, - 3) - .ToList(); + var numbersToSquareReduce = System.Linq.Enumerable.Range(1, + 3) + .ToList(); var clientPayload = new ClientPayload { Type = ClientPayload.TaskType.ComputeCube, Numbers = numbersToSquareReduce, }.Serialize(); var expectedResult = numbersToSquareReduce.Sum(x => x * x * x); - var payloadsTasks = Enumerable.Range(1, - 2) - .Select(elem => new Task>(() => SendTaskAndGetPayloadResults(clientPayload))) - .ToArray(); + var payloadsTasks = System.Linq.Enumerable.Range(1, + 2) + .Select(elem => new Task>(() => SendTaskAndGetPayloadResults(clientPayload))) + .ToArray(); payloadsTasks.AsParallel() .ForAll(t => t.Start()); Task.WaitAll(payloadsTasks); @@ -77,9 +77,9 @@ public IEnumerable SendTaskAndGetPayloadResults(byte[] clientPayl var symphonyTestHelper = new SymphonyTestHelper(ApplicationNamespace, ApplicationService); - var payloads = Enumerable.Repeat(0, - 20) - .Select(_ => clientPayload); + var payloads = System.Linq.Enumerable.Repeat(0, + 20) + .Select(_ => clientPayload); var taskIds = symphonyTestHelper.SessionService.SubmitTasks(payloads); var taskResults = symphonyTestHelper.WaitForTaskResults(taskIds) .ToList(); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminClientTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminClientTest.cs index e506336e..1f447785 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminClientTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminClientTest.cs @@ -30,10 +30,10 @@ public class SimpleUnifiedApiAdminClientTest private const string ApplicationNamespace = "ArmoniK.EndToEndTests.Worker.Tests.CheckUnifiedApi"; private const string ApplicationService = "CheckUnifiedApiWorker"; - private readonly double[] numbers_ = Enumerable.Range(0, - 10) - .Select(i => (double)i) - .ToArray(); + private readonly double[] numbers_ = System.Linq.Enumerable.Range(0, + 10) + .Select(i => (double)i) + .ToArray(); private UnifiedTestHelper unifiedTestHelper_; @@ -54,9 +54,9 @@ public void Check_That_CancelSession_Is_Working() { const int wantedCount = 100; var tasks = unifiedTestHelper_.Service.Submit("ComputeBasicArrayCube", - Enumerable.Range(1, - wantedCount) - .Select(_ => UnitTestHelperBase.ParamsHelper(numbers_)), + System.Linq.Enumerable.Range(1, + wantedCount) + .Select(_ => UnitTestHelperBase.ParamsHelper(numbers_)), unifiedTestHelper_); if (tasks.Count() is var count && count != wantedCount) { @@ -78,9 +78,9 @@ public void Check_TaskIdListing() { const int wantedCount = 100; var tasks = unifiedTestHelper_.Service.Submit("ComputeBasicArrayCube", - Enumerable.Range(1, - wantedCount) - .Select(_ => UnitTestHelperBase.ParamsHelper(numbers_)), + System.Linq.Enumerable.Range(1, + wantedCount) + .Select(_ => UnitTestHelperBase.ParamsHelper(numbers_)), unifiedTestHelper_); if (tasks.Count() is var count && count != wantedCount) { diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminTestClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminTestClient.cs index 3f5eb874..549b2f79 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminTestClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIAdminTestClient.cs @@ -52,13 +52,13 @@ public SimpleUnifiedApiAdminTestClient(IConfiguration configuration, public void HandleError(ServiceInvocationException e, string taskId) { - if (e.StatusCode == ArmonikStatusCode.TaskCancelled) + if (e.TaskStatusCode == ArmonikTaskStatusCode.TaskCancelled) { - Log.LogWarning($"Task canceled : {taskId}. Status {e.StatusCode.ToString()} Message : {e.Message}\nDetails : {e.OutputDetails}"); + Log.LogWarning($"Task canceled : {taskId}. TaskStatus {e.TaskStatusCode.ToString()} Message : {e.Message}\nDetails : {e.OutputDetails}"); } else { - Log.LogError($"Fail to get result from {taskId}. Status {e.StatusCode.ToString()} Message : {e.Message}\nDetails : {e.OutputDetails}"); + Log.LogError($"Fail to get result from {taskId}. TaskStatus {e.TaskStatusCode.ToString()} Message : {e.Message}\nDetails : {e.OutputDetails}"); throw new ApplicationException($"Error from {taskId}", e); @@ -103,8 +103,8 @@ public override void EntryPoint() //var resourceId = ServiceAdmin.CreateInstance(Configuration, LoggerFactory,props).UploadResource("filePath"); - using var cs = ServiceFactory.CreateService(props, - LoggerFactory); + var cs = ServiceFactory.CreateService(props, + LoggerFactory); using var csa = ServiceFactory.GetServiceAdmin(props, LoggerFactory); @@ -148,9 +148,9 @@ private void RunningAndCancelSession(ISubmitterService sessionService, }.ToArray(); const int wantedCount = 100; var tasks = sessionService.Submit("ComputeBasicArrayCube", - Enumerable.Range(1, - wantedCount) - .Select(_ => ParamsHelper(numbers)), + System.Linq.Enumerable.Range(1, + wantedCount) + .Select(_ => ParamsHelper(numbers)), this); if (tasks.Count() is var count && count != wantedCount) { diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIClientTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIClientTest.cs index c8b05027..4f764ebc 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIClientTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPIClientTest.cs @@ -30,10 +30,10 @@ public class SimpleUnifiedApiClientTest private const string ApplicationNamespace = "ArmoniK.EndToEndTests.Worker.Tests.CheckUnifiedApi"; private const string ApplicationService = "CheckUnifiedApiWorker"; - private readonly double[] numbers_ = Enumerable.Range(0, - 10) - .Select(i => (double)i) - .ToArray(); + private readonly double[] numbers_ = System.Linq.Enumerable.Range(0, + 10) + .Select(i => (double)i) + .ToArray(); private UnifiedTestHelper unifiedTestHelper_; @@ -196,9 +196,9 @@ public void Check_That_We_Get_Exception_If_RetryCount_Is_Exceeded() { var nbTasksToSubmit = 3; var taskId = unifiedTestHelper_.Service.Submit("RandomTaskError", - Enumerable.Range(1, - nbTasksToSubmit) - .Select(_ => UnitTestHelperBase.ParamsHelper(100)), + System.Linq.Enumerable.Range(1, + nbTasksToSubmit) + .Select(_ => UnitTestHelperBase.ParamsHelper(100)), unifiedTestHelper_); var results = unifiedTestHelper_.WaitForResultcompletion(taskId); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPITestClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPITestClient.cs index 063b47b6..8dc5caf5 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPITestClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/CheckUnifiedApi/SimpleUnifiedAPITestClient.cs @@ -127,8 +127,8 @@ public override void EntryPoint() Configuration.GetSection("Grpc")["EndPoint"], 5001); - using var cs = ServiceFactory.CreateService(props, - LoggerFactory); + var cs = ServiceFactory.CreateService(props, + LoggerFactory); Log.LogInformation($"New session created : {cs.SessionId}"); @@ -224,9 +224,9 @@ private void ClientStartup2(ISubmitterService sessionService) const int wantedCount = 100; var tasksBasic = sessionService.Submit("ComputeBasicArrayCube", - Enumerable.Range(1, - wantedCount) - .Select(n => ParamsHelper(numbers)), + System.Linq.Enumerable.Range(1, + wantedCount) + .Select(n => ParamsHelper(numbers)), this); if (tasksBasic.Count() is var countBasic && countBasic != wantedCount) { @@ -235,9 +235,9 @@ private void ClientStartup2(ISubmitterService sessionService) var handler = new IgnoreErrorHandler(Log); var tasksRandom = sessionService.Submit("RandomTaskError", - Enumerable.Range(1, - wantedCount) - .Select(_ => ParamsHelper(25)), + System.Linq.Enumerable.Range(1, + wantedCount) + .Select(_ => ParamsHelper(25)), handler); if (tasksRandom.Count() is var countRandom && countRandom != wantedCount) { diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClient.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClient.cs index 0e02ef9e..3162614f 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClient.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClient.cs @@ -110,8 +110,8 @@ public override void EntryPoint() Configuration.GetSection("Grpc")["EndPoint"], 5001); - using var cs = ServiceFactory.CreateService(props, - LoggerFactory); + var cs = ServiceFactory.CreateService(props, + LoggerFactory); Log.LogInformation($"New session created : {cs.SessionId}"); @@ -171,10 +171,10 @@ private void ComputeVector(ISubmitterService sessionService, const int workloadTimeInMs = 100; nbResults_ = 0; - var numbers = Enumerable.Range(0, - nbElement) - .Select(x => (double)x) - .ToArray(); + var numbers = System.Linq.Enumerable.Range(0, + nbElement) + .Select(x => (double)x) + .ToArray(); Log.LogInformation($"=== Running from {nbTasks} tasks with payload by task {nbElement * 8 / 1024} Ko Total : {nbTasks * nbElement / 128} Ko... ==="); PeriodicInfo(() => diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClientTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClientTest.cs index 6af3e09c..22bdc5f8 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClientTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargePayloadSubmit/LargePayloadSubmitClientTest.cs @@ -58,10 +58,10 @@ public void Check_That_Large_Payload_Submission_Is_Working(int nbTasks, const int workloadTimeInMs = 100; using var cancellationTokenSource = new CancellationTokenSource(); - var numbers = Enumerable.Range(0, - nbElement) - .Select(x => (double)x) - .ToArray(); + var numbers = System.Linq.Enumerable.Range(0, + nbElement) + .Select(x => (double)x) + .ToArray(); var expectedResult = numbers.Sum(); var sw = Stopwatch.StartNew(); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncTest.cs index 8e6600c8..570bff5f 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/LargeSubmitAsyncTest.cs @@ -85,10 +85,10 @@ public void Check_That_Buffering_With_SubmitAsync_Is_Working(int nbTasks, var service = localUnifiedTestHelper.Service as Service; - var numbers = Enumerable.Range(1, - nbElementInWorkLoad) - .Select(elem => (double)elem) - .ToArray(); + var numbers = System.Linq.Enumerable.Range(1, + nbElementInWorkLoad) + .Select(elem => (double)elem) + .ToArray(); for (indexTask = 0; indexTask < nbTasks; indexTask++) diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/SubmitAsyncFixRequestOrder.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/SubmitAsyncFixRequestOrder.cs index 189f73e2..62a2abdb 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/SubmitAsyncFixRequestOrder.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/LargeSubmitAsync/SubmitAsyncFixRequestOrder.cs @@ -110,10 +110,10 @@ async Task Function(int i) taskIdExpectedResults[myTaskId] = expectedResult; } - Enumerable.Range(0, - nbTasks) - .LoopAsync(Function) - .Wait(cancellationSource.Token); + System.Linq.Enumerable.Range(0, + nbTasks) + .LoopAsync(Function) + .Wait(cancellationSource.Token); var taskResult = localUnifiedTestHelper.WaitForResultcompletion(taskIdExpectedResults.Select(elem => elem.Key)); Assert.IsNotNull(taskResult); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/PayloadIntegrityTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/PayloadIntegrityTest.cs index e98392dd..32bcf007 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/PayloadIntegrityTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/PayloadIntegrityTest.cs @@ -51,12 +51,12 @@ public void Setup() true, false) .AddEnvironmentVariables(); - _configuration = builder.Build(); - if (_configuration.GetSection("ProxyUrl") + configuration_ = builder.Build(); + if (configuration_.GetSection("ProxyUrl") .Exists()) { Environment.SetEnvironmentVariable("https_proxy", - _configuration.GetSection("ProxyUrl") + configuration_.GetSection("ProxyUrl") .Value, EnvironmentVariableTarget.Process); } @@ -94,7 +94,7 @@ public void Setup() private ResultHandler? resultHandler_; private readonly ConcurrentDictionary taskAndData_ = new(); private readonly ConcurrentDictionary responseAndData_ = new(); - private IConfigurationRoot _configuration; + private IConfigurationRoot configuration_; [TestCase(1, 1, @@ -113,7 +113,7 @@ public void CopyPayload(int maxConcurrentBuffers, var fixture = new Fixture(); var tasks = new List(); - var props = new Properties(_configuration, + var props = new Properties(configuration_, taskOptions_) { MaxConcurrentBuffers = maxConcurrentBuffers, @@ -136,7 +136,6 @@ public void CopyPayload(int maxConcurrentBuffers, CollectionAssert.AreEquivalent(taskAndData_, responseAndData_); - service.Dispose(); responseAndData_.Clear(); taskAndData_.Clear(); } diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/ResultHandler.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/ResultHandler.cs index e7f40bbc..8cad75a5 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/ResultHandler.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/PayloadIntegrityTestClient/ResultHandler.cs @@ -27,23 +27,23 @@ public delegate void HandleResponseType(object response, public class ResultHandler : IServiceInvocationHandler { - private readonly HandleErrorType _onError; - private readonly HandleResponseType _onResponse; + private readonly HandleErrorType onError_; + private readonly HandleResponseType onResponse_; public ResultHandler(HandleErrorType onError, HandleResponseType onResponse) { - _onError = onError; - _onResponse = onResponse; + onError_ = onError; + onResponse_ = onResponse; } public void HandleError(ServiceInvocationException e, string taskId) - => _onError(e, + => onError_(e, taskId); public void HandleResponse(object response, string taskId) - => _onResponse(response, + => onResponse_(response, taskId); } diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/Priority/Priority.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/Priority/Priority.cs index 05cf810a..32261211 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/Priority/Priority.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/Priority/Priority.cs @@ -107,16 +107,16 @@ public void TestThatPrioritiesAreAccountedFor() var tasks = new Dictionary(); - foreach (var t in Enumerable.Range(1, - 5)) + foreach (var t in System.Linq.Enumerable.Range(1, + 5)) { var options = properties.TaskOptions.Clone(); options.Priority = t; var payload = new ArmonikPayload("GetPriority", BitConverter.GetBytes(options.Priority), true).Serialize(); - foreach (var submitTask in service.SubmitTasks(Enumerable.Repeat(payload, - nTasksPerSessionPerPriority), + foreach (var submitTask in service.SubmitTasks(System.Linq.Enumerable.Repeat(payload, + nTasksPerSessionPerPriority), taskOptions: options)) { tasks.Add(submitTask, diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs index 30991cd2..43abfd08 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/UnitTestHelperBase.cs @@ -133,7 +133,7 @@ protected TaskOptions InitializeTaskOptions(EngineType engineType, }; } -public static class IEnumerable +public static class Enumerable { /// /// Extensions to loop Async all over IEnumerable without expected result diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/Assert.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/Assert.cs index 44929240..a4fb014f 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/Assert.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Common/Assert.cs @@ -21,9 +21,9 @@ public static class Assert public static void AreEqual(T expected, T value) { - if (!expected.Equals(value)) + if ((expected is null && value is not null) || (expected is not null && !expected.Equals(value))) { - throw new ArgumentException($"Excpected {expected}\nBut was: {value}"); + throw new ArgumentException($"Expected {expected}\nBut was: {value}"); } } } diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckRandomException/RandomExceptionSym.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckRandomException/RandomExceptionSym.cs index a39218d3..b4e81b0b 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckRandomException/RandomExceptionSym.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckRandomException/RandomExceptionSym.cs @@ -34,7 +34,7 @@ namespace ArmoniK.EndToEndTests.Worker.Tests.CheckRandomException; [PublicAPI] public class ServiceContainer : ServiceContainerBase { - private readonly Random rd = new(); + private readonly Random rd_ = new(); public override void OnCreateService(ServiceContext serviceContext) { @@ -77,7 +77,7 @@ private double ExpM1(double x) { var percentageOfFailure = 5.0; - var randNum = rd.NextDouble(); + var randNum = rd_.NextDouble(); if (randNum < percentageOfFailure / 100) { throw new MyCustomWorkerException("An expected failure in this random call"); diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckUnifiedApi/SimpleUnfiedAPITest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckUnifiedApi/SimpleUnfiedAPITest.cs index 020590e1..8c7be553 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckUnifiedApi/SimpleUnfiedAPITest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Worker/Tests/CheckUnifiedApi/SimpleUnfiedAPITest.cs @@ -25,7 +25,7 @@ namespace ArmoniK.EndToEndTests.Worker.Tests.CheckUnifiedApi; public class CheckUnifiedApiWorker : TaskWorkerService { - private readonly Random rd = new(); + private readonly Random rd_ = new(); public double[] ComputeBasicArrayCube(double[] inputs) => inputs.Select(x => x * x * x) @@ -83,7 +83,7 @@ public double[] NonStaticComputeMadd(byte[] inputs1, public double[] RandomTaskError(double percentageOfFailure = 25) { - var randNum = rd.NextDouble(); + var randNum = rd_.NextDouble(); if (randNum < percentageOfFailure / 100) { throw new UnifiedException("An expected failure in this random call"); diff --git a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj index 18d3c0f9..53a10bf6 100644 --- a/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj +++ b/Worker/src/Common/ArmoniK.DevelopmentKit.Worker.Common.csproj @@ -10,10 +10,10 @@ - + - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Worker/src/DLLWorker/ArmoniK.DevelopmentKit.Worker.DLLWorker.csproj b/Worker/src/DLLWorker/ArmoniK.DevelopmentKit.Worker.DLLWorker.csproj index 4d710097..7afbfa6c 100644 --- a/Worker/src/DLLWorker/ArmoniK.DevelopmentKit.Worker.DLLWorker.csproj +++ b/Worker/src/DLLWorker/ArmoniK.DevelopmentKit.Worker.DLLWorker.csproj @@ -17,7 +17,7 @@ - + diff --git a/Worker/src/Symphony/ArmoniK.DevelopmentKit.Worker.Symphony.csproj b/Worker/src/Symphony/ArmoniK.DevelopmentKit.Worker.Symphony.csproj index 76af5520..80261fbb 100644 --- a/Worker/src/Symphony/ArmoniK.DevelopmentKit.Worker.Symphony.csproj +++ b/Worker/src/Symphony/ArmoniK.DevelopmentKit.Worker.Symphony.csproj @@ -9,7 +9,7 @@ - + diff --git a/Worker/src/Unified/ArmoniK.DevelopmentKit.Worker.Unified.csproj b/Worker/src/Unified/ArmoniK.DevelopmentKit.Worker.Unified.csproj index c19395bf..8c0dfad2 100644 --- a/Worker/src/Unified/ArmoniK.DevelopmentKit.Worker.Unified.csproj +++ b/Worker/src/Unified/ArmoniK.DevelopmentKit.Worker.Unified.csproj @@ -13,7 +13,7 @@ - + diff --git a/kp.pk b/kp.pk new file mode 100644 index 00000000..143235ad Binary files /dev/null and b/kp.pk differ