diff --git a/Base/src/ArmoniK.Core.Base.csproj b/Base/src/ArmoniK.Core.Base.csproj index f4c66f188..a563b6110 100644 --- a/Base/src/ArmoniK.Core.Base.csproj +++ b/Base/src/ArmoniK.Core.Base.csproj @@ -26,7 +26,7 @@ - + diff --git a/Common/src/ArmoniK.Core.Common.csproj b/Common/src/ArmoniK.Core.Common.csproj index f9f323753..d2ab41d44 100644 --- a/Common/src/ArmoniK.Core.Common.csproj +++ b/Common/src/ArmoniK.Core.Common.csproj @@ -26,7 +26,7 @@ - + diff --git a/Common/src/Injection/Options/Pollster.cs b/Common/src/Injection/Options/Pollster.cs index 8ecbc7d8a..589a88960 100644 --- a/Common/src/Injection/Options/Pollster.cs +++ b/Common/src/Injection/Options/Pollster.cs @@ -50,4 +50,14 @@ public class Pollster /// This happens in parallel of the execution of another task /// public TimeSpan TimeoutBeforeNextAcquisition { get; set; } = TimeSpan.FromSeconds(10); + + /// + /// Shared folder between Agent and Worker + /// + public string SharedCacheFolder { get; set; } = "/cache/shared"; + + /// + /// Internal cache for data + /// + public string InternalCacheFolder { get; set; } = "/cache/internal"; } diff --git a/Common/src/Pollster/AgentHandler.cs b/Common/src/Pollster/AgentHandler.cs index 0eb9a11e7..3e459621c 100644 --- a/Common/src/Pollster/AgentHandler.cs +++ b/Common/src/Pollster/AgentHandler.cs @@ -133,6 +133,7 @@ public async Task Start(string token, ILogger logger, SessionData sessionData, TaskData taskData, + string folder, CancellationToken cancellationToken) { try @@ -144,6 +145,7 @@ public async Task Start(string token, taskTable_, sessionData, taskData, + folder, token, logger); diff --git a/Common/src/Pollster/ComputeRequestQueue.cs b/Common/src/Pollster/ComputeRequestQueue.cs deleted file mode 100644 index 18c8b2fc3..000000000 --- a/Common/src/Pollster/ComputeRequestQueue.cs +++ /dev/null @@ -1,198 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Collections.Generic; - -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; -using ArmoniK.Core.Common.StateMachines; - -using Google.Protobuf; - -using Microsoft.Extensions.Logging; - -namespace ArmoniK.Core.Common.Pollster; - -/// -/// Queue to identify input data with a while verifying -/// the request ordering with state machines -/// -public class ComputeRequestQueue -{ - private readonly Queue computeRequests_; - - private readonly ILogger logger_; - private readonly ComputeRequestStateMachine machine_; - - /// - /// Initializes a queue that stores - /// - /// - public ComputeRequestQueue(ILogger logger) - { - logger_ = logger; - computeRequests_ = new Queue(); - machine_ = new ComputeRequestStateMachine(logger_); - } - - /// - /// Create the init computation request with the given parameters - /// - /// The maximum size of a data chunk - /// The session identifier - /// The task identifier - /// The options of the task - /// The input data of the task - /// Collection of data ids for the expected outputs of the task - /// Invalid request according to the state machine - public void Init(int dataChunkMaxSize, - string sessionId, - string taskId, - TaskOptions taskOptions, - ByteString? payload, - IList expectedOutputKeys) - { - machine_.InitRequest(); - computeRequests_.Enqueue(new ProcessRequest.Types.ComputeRequest - { - InitRequest = new ProcessRequest.Types.ComputeRequest.Types.InitRequest - { - Configuration = new Configuration - { - DataChunkMaxSize = dataChunkMaxSize, - }, - TaskId = taskId, - SessionId = sessionId, - TaskOptions = taskOptions, - Payload = payload is not null - ? new DataChunk - { - Data = payload, - } - : new DataChunk(), - ExpectedOutputKeys = - { - expectedOutputKeys, - }, - }, - }); - } - - /// - /// Add the given payload chunk to the request queue - /// - /// Data chunk - /// Invalid request according to the state machine - public void AddPayloadChunk(ByteString chunk) - { - machine_.AddPayloadChunk(); - computeRequests_.Enqueue(new ProcessRequest.Types.ComputeRequest - { - Payload = new DataChunk - { - Data = chunk, - }, - }); - } - - /// - /// Add a request representing the end of the payload in the queue - /// - /// Invalid request according to the state machine - public void CompletePayload() - { - machine_.CompletePayload(); - computeRequests_.Enqueue(new ProcessRequest.Types.ComputeRequest - { - Payload = new DataChunk - { - DataComplete = true, - }, - }); - } - - /// - /// Add a request representing the start of a data dependency in the queue - /// - /// The identifier of the data dependency - /// Invalid request according to the state machine - public void InitDataDependency(string key) - { - machine_.InitDataDependency(); - computeRequests_.Enqueue(new ProcessRequest.Types.ComputeRequest - { - InitData = new ProcessRequest.Types.ComputeRequest.Types.InitData - { - Key = key, - }, - }); - } - - /// - /// Add a request containing the given data chunk for the data dependency in the queue - /// - /// Data chunk - /// Invalid request according to the state machine - public void AddDataDependencyChunk(ByteString chunk) - { - machine_.AddDataDependencyChunk(); - computeRequests_.Enqueue(new ProcessRequest.Types.ComputeRequest - { - Data = new DataChunk - { - Data = chunk, - }, - }); - } - - /// - /// Add a request representing the end of the data dependency in the queue - /// - /// Invalid request according to the state machine - public void CompleteDataDependency() - { - machine_.CompleteDataDependency(); - computeRequests_.Enqueue(new ProcessRequest.Types.ComputeRequest - { - Data = new DataChunk - { - DataComplete = true, - }, - }); - } - - /// - /// Get the queue with the complete compute request - /// - /// - /// The queue with the complete compute request - /// - /// Invalid request according to the state machine - public Queue GetQueue() - { - machine_.CompleteRequest(); - computeRequests_.Enqueue(new ProcessRequest.Types.ComputeRequest - { - InitData = new ProcessRequest.Types.ComputeRequest.Types.InitData - { - LastData = true, - }, - }); - return computeRequests_; - } -} diff --git a/Common/src/Pollster/DataPrefetcher.cs b/Common/src/Pollster/DataPrefetcher.cs index a34ee3615..fff32105b 100644 --- a/Common/src/Pollster/DataPrefetcher.cs +++ b/Common/src/Pollster/DataPrefetcher.cs @@ -16,22 +16,17 @@ // along with this program. If not, see . using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; +using System.IO; using System.Threading; using System.Threading.Tasks; using ArmoniK.Api.Common.Utils; -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base; using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Exceptions; using ArmoniK.Core.Common.Storage; -using Google.Protobuf; - using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; @@ -84,58 +79,49 @@ await objectStorage_.Init(cancellationToken) /// Method used to prefetch data before executing a task /// /// Task metadata + /// /// Token used to cancel the execution of the method /// /// Queue containing the request containing the data for the task which can be sent to the worker /// /// input data are not found /// invalid transition between states - public async Task> PrefetchDataAsync(TaskData taskData, - CancellationToken cancellationToken) + public async Task PrefetchDataAsync(TaskData taskData, + string folder, + CancellationToken cancellationToken) { using var activity = activitySource_?.StartActivity(); using var sessionScope = logger_.BeginPropertyScope(("sessionId", taskData.SessionId)); activity?.AddEvent(new ActivityEvent("Load payload")); - var payloadChunks = await objectStorage_.GetValuesAsync(taskData.PayloadId, - cancellationToken) - .Select(bytes => UnsafeByteOperations.UnsafeWrap(bytes)) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); - - var computeRequests = new ComputeRequestQueue(logger_); - computeRequests.Init(PayloadConfiguration.MaxChunkSize, - taskData.SessionId, - taskData.TaskId, - taskData.Options.ToGrpcTaskOptions(), - payloadChunks.FirstOrDefault(), - taskData.ExpectedOutputIds); - for (var i = 1; i < payloadChunks.Count; i++) + await using (var fs = new FileStream(Path.Combine(folder, + taskData.PayloadId), + FileMode.OpenOrCreate)) { - computeRequests.AddPayloadChunk(payloadChunks[i]); + await using var w = new BinaryWriter(fs); + await foreach (var chunk in objectStorage_.GetValuesAsync(taskData.PayloadId, + cancellationToken) + .ConfigureAwait(false)) + { + w.Write(chunk); + } } - computeRequests.CompletePayload(); foreach (var dataDependency in taskData.DataDependencies) { - var dependencyChunks = await objectStorage_.GetValuesAsync(dataDependency, - cancellationToken) - .Select(bytes => UnsafeByteOperations.UnsafeWrap(bytes)) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); - - computeRequests.InitDataDependency(dataDependency); - foreach (var chunk in dependencyChunks) + await using var fs = new FileStream(Path.Combine(folder, + dataDependency), + FileMode.OpenOrCreate); + await using var w = new BinaryWriter(fs); + await foreach (var chunk in objectStorage_.GetValuesAsync(dataDependency, + cancellationToken) + .ConfigureAwait(false)) { - computeRequests.AddDataDependencyChunk(chunk); + w.Write(chunk); } - - computeRequests.CompleteDataDependency(); } - - return computeRequests.GetQueue(); } } diff --git a/Common/src/Pollster/IAgentHandler.cs b/Common/src/Pollster/IAgentHandler.cs index 4f0db4103..7cdeeeef1 100644 --- a/Common/src/Pollster/IAgentHandler.cs +++ b/Common/src/Pollster/IAgentHandler.cs @@ -46,6 +46,7 @@ public interface IAgentHandler /// Logger that may be injected into the handler that embed preconfigured scopes /// Session metadata /// Task metadata + /// Shared folder between Agent and Worker /// Token used to cancel the execution of the method /// /// Task representing the asynchronous execution of the method @@ -54,5 +55,6 @@ Task Start(string token, ILogger logger, SessionData sessionData, TaskData taskData, + string folder, CancellationToken cancellationToken); } diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 35814cd7b..506de8b3f 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -16,8 +16,8 @@ // along with this program. If not, see . using System; -using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; @@ -43,30 +43,30 @@ namespace ArmoniK.Core.Common.Pollster; public sealed class TaskHandler : IAsyncDisposable { - private readonly ActivitySource activitySource_; - private readonly IAgentHandler agentHandler_; - private readonly CancellationTokenSource cancellationTokenSource_; - private readonly DataPrefetcher dataPrefetcher_; - private readonly TimeSpan delayBeforeAcquisition_; - private readonly ILogger logger_; - private readonly IQueueMessageHandler messageHandler_; - private readonly Action onDispose_; - private readonly string ownerPodId_; - private readonly string ownerPodName_; - private readonly CancellationTokenRegistration reg1_; - private readonly IResultTable resultTable_; - private readonly ISessionTable sessionTable_; - private readonly ISubmitter submitter_; - private readonly ITaskProcessingChecker taskProcessingChecker_; - private readonly ITaskTable taskTable_; - private readonly string token_; - private readonly CancellationTokenSource workerConnectionCts_; - private readonly IWorkerStreamHandler workerStreamHandler_; - private IAgent? agent_; - private Queue? computeRequestStream_; - private ProcessReply? reply_; - private SessionData? sessionData_; - private TaskData? taskData_; + private readonly ActivitySource activitySource_; + private readonly IAgentHandler agentHandler_; + private readonly CancellationTokenSource cancellationTokenSource_; + private readonly DataPrefetcher dataPrefetcher_; + private readonly TimeSpan delayBeforeAcquisition_; + private readonly string folder_; + private readonly ILogger logger_; + private readonly IQueueMessageHandler messageHandler_; + private readonly Action onDispose_; + private readonly string ownerPodId_; + private readonly string ownerPodName_; + private readonly CancellationTokenRegistration reg1_; + private readonly IResultTable resultTable_; + private readonly ISessionTable sessionTable_; + private readonly ISubmitter submitter_; + private readonly ITaskProcessingChecker taskProcessingChecker_; + private readonly ITaskTable taskTable_; + private readonly string token_; + private readonly CancellationTokenSource workerConnectionCts_; + private readonly IWorkerStreamHandler workerStreamHandler_; + private IAgent? agent_; + private ProcessReply? reply_; + private SessionData? sessionData_; + private TaskData? taskData_; public TaskHandler(ISessionTable sessionTable, ITaskTable taskTable, @@ -103,6 +103,9 @@ public TaskHandler(ISessionTable sessionTable, sessionData_ = null; token_ = Guid.NewGuid() .ToString(); + folder_ = Path.Combine(pollsterOptions.SharedCacheFolder, + token_); + Directory.CreateDirectory(folder_); delayBeforeAcquisition_ = pollsterOptions.TimeoutBeforeNextAcquisition + TimeSpan.FromSeconds(2); workerConnectionCts_ = new CancellationTokenSource(); @@ -139,6 +142,14 @@ await reg1_.DisposeAsync() cancellationTokenSource_.Dispose(); workerConnectionCts_.Dispose(); agent_?.Dispose(); + try + { + Directory.Delete(folder_, + true); + } + catch (DirectoryNotFoundException) + { + } } /// @@ -470,9 +481,10 @@ public async Task PreProcessing() try { - computeRequestStream_ = await dataPrefetcher_.PrefetchDataAsync(taskData_, - cancellationTokenSource_.Token) - .ConfigureAwait(false); + await dataPrefetcher_.PrefetchDataAsync(taskData_, + folder_, + cancellationTokenSource_.Token) + .ConfigureAwait(false); } catch (Exception e) { @@ -493,7 +505,7 @@ await HandleErrorRequeueAsync(e, /// worker pipe is not initialized public async Task ExecuteTask() { - if (computeRequestStream_ == null || taskData_ == null || sessionData_ == null) + if (taskData_ == null || sessionData_ == null) { throw new NullReferenceException(); } @@ -512,6 +524,7 @@ public async Task ExecuteTask() logger_, sessionData_, taskData_, + folder_, cancellationTokenSource_.Token) .ConfigureAwait(false); @@ -524,27 +537,6 @@ public async Task ExecuteTask() await taskTable_.StartTask(taskData_, cancellationTokenSource_.Token) .ConfigureAwait(false); - - workerStreamHandler_.StartTaskProcessing(taskData_, - workerConnectionCts_.Token); - - if (workerStreamHandler_.Pipe is null) - { - throw new ArmoniKException($"{nameof(IWorkerStreamHandler.Pipe)} should not be null"); - } - - while (computeRequestStream_.TryDequeue(out var computeRequest)) - { - await workerStreamHandler_.Pipe.WriteAsync(new ProcessRequest - { - Compute = computeRequest, - CommunicationToken = token_, - }) - .ConfigureAwait(false); - } - - await workerStreamHandler_.Pipe.CompleteAsync() - .ConfigureAwait(false); } catch (TaskAlreadyInFinalStateException e) { @@ -565,7 +557,29 @@ await HandleErrorRequeueAsync(e, { // at this point worker requests should have ended logger_.LogDebug("Wait for task output"); - reply_ = await workerStreamHandler_.Pipe!.ReadAsync(workerConnectionCts_.Token) + reply_ = await workerStreamHandler_.StartTaskProcessing(new ProcessRequest + { + CommunicationToken = token_, + Configuration = new Configuration + { + DataChunkMaxSize = PayloadConfiguration.MaxChunkSize, + }, + DataDependencies = + { + taskData_.DataDependencies, + }, + DataFolder = folder_, + ExpectedOutputKeys = + { + taskData_.ExpectedOutputIds, + }, + PayloadId = taskData_.PayloadId, + SessionId = taskData_.SessionId, + TaskId = taskData_.SessionId, + TaskOptions = taskData_.Options.ToGrpcTaskOptions(), + }, + taskData_.Options.MaxDuration, + workerConnectionCts_.Token) .ConfigureAwait(false); logger_.LogDebug("Stop agent server"); @@ -574,9 +588,9 @@ await agentHandler_.Stop(workerConnectionCts_.Token) } catch (Exception e) { - await HandleErrorResubmitAsync(e, - taskData_, - cancellationTokenSource_.Token) + await HandleErrorTaskExecutionAsync(e, + taskData_, + cancellationTokenSource_.Token) .ConfigureAwait(false); } } @@ -590,11 +604,6 @@ await HandleErrorResubmitAsync(e, /// wrong order of execution public async Task PostProcessing() { - if (workerStreamHandler_.Pipe is null) - { - throw new NullReferenceException(nameof(workerStreamHandler_.Pipe) + " is null."); - } - if (taskData_ is null) { throw new NullReferenceException(nameof(taskData_) + " is null."); @@ -655,6 +664,16 @@ private async Task HandleErrorRequeueAsync(Exception e, cancellationToken) .ConfigureAwait(false); + private async Task HandleErrorTaskExecutionAsync(Exception e, + TaskData taskData, + CancellationToken cancellationToken) + => await HandleErrorInternalAsync(e, + taskData, + true, + true, + cancellationToken) + .ConfigureAwait(false); + private async Task HandleErrorResubmitAsync(Exception e, TaskData taskData, CancellationToken cancellationToken) diff --git a/Common/src/StateMachines/ComputeRequestStateMachine.cs b/Common/src/StateMachines/ComputeRequestStateMachine.cs deleted file mode 100644 index bf23da2a2..000000000 --- a/Common/src/StateMachines/ComputeRequestStateMachine.cs +++ /dev/null @@ -1,281 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Linq; -using System.Text; - -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; - -using Microsoft.Extensions.Logging; - -using Stateless; -using Stateless.Graph; - -namespace ArmoniK.Core.Common.StateMachines; - -/// -/// Utility class for the Final State Machine from -/// -public class ComputeRequestStateMachine -{ - /// - /// States for the Final State Machine - /// - public enum State - { - /// - /// Initial state of the Final State Machine. - /// - Init, - - /// - /// State after receiving . - /// - InitRequest, - - /// - /// State when receiving payload chunk. - /// - PayloadData, - - /// - /// State after reception of the last payload chunk. - /// - PayloadComplete, - - /// - /// State for initiating the reception of the data for a data dependency. - /// - DataInit, - - /// - /// State when receiving the last data chunk of the given data dependency. - /// - DataComplete, - - /// - /// State when receiving a data chunk. - /// - Data, - - /// - /// State when receiving the last data dependency. - /// - DataLast, - } - - /// - /// Transitions for the Final State Machine - /// - public enum Triggers - { - /// - /// Correspond to receive last request - /// - CompleteRequest, - - /// - /// Correspond to receive request as - /// - /// - CompleteDataDependency, - - /// - /// Correspond to receive request as - /// - /// - AddDataDependencyChunk, - - /// - /// Correspond to receive request - /// - InitDataDependency, - - /// - /// Correspond to receive request as - /// - /// - CompletePayload, - - /// - /// Correspond to receive request as - /// - /// - AddPayloadChunk, - - /// - /// Correspond to receive request - /// - InitRequest, - } - - private readonly ILogger logger_; - - private readonly StateMachine machine_; - - /// - /// Constructor that initializes the Final State Machine - /// - /// Logger used to produce logs for this class - public ComputeRequestStateMachine(ILogger logger) - { - logger_ = logger; - machine_ = new StateMachine(State.Init); - - machine_.Configure(State.Init) - .Permit(Triggers.InitRequest, - State.InitRequest); - - machine_.Configure(State.InitRequest) - .Permit(Triggers.AddPayloadChunk, - State.PayloadData) - .Permit(Triggers.CompletePayload, - State.PayloadComplete); - - machine_.Configure(State.PayloadData) - .PermitReentry(Triggers.AddPayloadChunk) - .Permit(Triggers.CompletePayload, - State.PayloadComplete); - - machine_.Configure(State.PayloadComplete) - .Permit(Triggers.InitDataDependency, - State.DataInit) - .Permit(Triggers.CompleteRequest, - State.DataLast); - - machine_.Configure(State.DataInit) - .Permit(Triggers.AddDataDependencyChunk, - State.Data); - - machine_.Configure(State.Data) - .PermitReentry(Triggers.AddDataDependencyChunk) - .Permit(Triggers.CompleteDataDependency, - State.DataComplete); - - machine_.Configure(State.DataComplete) - .Permit(Triggers.CompleteRequest, - State.DataLast) - .Permit(Triggers.InitDataDependency, - State.DataInit); - - if (logger_.IsEnabled(LogLevel.Debug)) - { - machine_.OnTransitioned(t => logger_.LogDebug("OnTransitioned {FSM}: {Source} -> {Destination}", - nameof(ComputeRequestStateMachine), - t.Source, - t.Destination)); - } - } - - /// - /// Function used when using transition - /// - /// Invalid transition - public void InitRequest() - => machine_.Fire(Triggers.InitRequest); - - /// - /// Function used when using transition - /// - /// Invalid transition - public void AddPayloadChunk() - => machine_.Fire(Triggers.AddPayloadChunk); - - /// - /// Function used when using transition - /// - /// Invalid transition - public void CompletePayload() - => machine_.Fire(Triggers.CompletePayload); - - /// - /// Function used when using transition - /// - /// Invalid transition - public void InitDataDependency() - => machine_.Fire(Triggers.InitDataDependency); - - /// - /// Function used when using transition - /// - /// Invalid transition - public void AddDataDependencyChunk() - => machine_.Fire(Triggers.AddDataDependencyChunk); - - /// - /// Function used when using transition - /// - /// Invalid transition - public void CompleteDataDependency() - => machine_.Fire(Triggers.CompleteDataDependency); - - /// - /// Function used when using transition - /// - /// Invalid transition - public void CompleteRequest() - => machine_.Fire(Triggers.CompleteRequest); - - /// - /// Generate a dot graph representing the Final State Machine - /// - /// - /// A string containing the graph in dot format - /// - public string GenerateGraph() - => UmlDotGraph.Format(machine_.GetInfo()); - - /// - /// Generate a Mermaid graph representing the Final State Machine - /// - /// - /// A string containing the graph in Mermaid format - /// - public string GenerateMermaidGraph() - { - var str = UmlMermaidGraph.Format(machine_.GetInfo()); - - // Manually fix the footer; the last - // 3 lines should be disposed - var lines = str.Split(new[] - { - Environment.NewLine, - }, - StringSplitOptions.None); - str = string.Join(Environment.NewLine, - lines.Take(lines.Length - 3)); - - // Enclose in markers for markdown - var bld = new StringBuilder(str); - bld.Insert(0, - "```mermaid\n"); - bld.Append("\n```\n"); - - return bld.ToString(); - } - - /// - /// Get the current state of the Final State Machine - /// - /// - /// The current state of the Final State Machine - /// - public State GetState() - => machine_.State; -} diff --git a/Common/src/Stream/Worker/IWorkerStreamHandler.cs b/Common/src/Stream/Worker/IWorkerStreamHandler.cs index be0bb014d..7632a9aa7 100644 --- a/Common/src/Stream/Worker/IWorkerStreamHandler.cs +++ b/Common/src/Stream/Worker/IWorkerStreamHandler.cs @@ -17,11 +17,10 @@ using System; using System.Threading; +using System.Threading.Tasks; using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base; -using ArmoniK.Core.Common.Storage; -using ArmoniK.Core.Common.Utils; using JetBrains.Annotations; @@ -30,8 +29,7 @@ namespace ArmoniK.Core.Common.Stream.Worker; [PublicAPI] public interface IWorkerStreamHandler : IInitializable, IDisposable { - public IAsyncPipe? Pipe { get; } - - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken); + public Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken); } diff --git a/Common/src/Stream/Worker/WorkerStreamHandler.cs b/Common/src/Stream/Worker/WorkerStreamHandler.cs index a20d308cd..0e09676c3 100644 --- a/Common/src/Stream/Worker/WorkerStreamHandler.cs +++ b/Common/src/Stream/Worker/WorkerStreamHandler.cs @@ -25,8 +25,6 @@ using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Exceptions; using ArmoniK.Core.Common.Injection.Options; -using ArmoniK.Core.Common.Storage; -using ArmoniK.Core.Common.Utils; using Grpc.Core; @@ -37,13 +35,12 @@ namespace ArmoniK.Core.Common.Stream.Worker; public class WorkerStreamHandler : IWorkerStreamHandler { - private readonly GrpcChannelProvider channelProvider_; - private readonly ILogger logger_; - private readonly InitWorker optionsInitWorker_; - private bool isInitialized_; - private int retryCheck_; - private AsyncClientStreamingCall? stream_; - private Api.gRPC.V1.Worker.Worker.WorkerClient? workerClient_; + private readonly GrpcChannelProvider channelProvider_; + private readonly ILogger logger_; + private readonly InitWorker optionsInitWorker_; + private bool isInitialized_; + private int retryCheck_; + private Api.gRPC.V1.Worker.Worker.WorkerClient? workerClient_; public WorkerStreamHandler(GrpcChannelProvider channelProvider, InitWorker optionsInitWorker, @@ -54,28 +51,6 @@ public WorkerStreamHandler(GrpcChannelProvider channelProvider, logger_ = logger; } - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) - { - if (workerClient_ == null) - { - throw new ArmoniKException("Worker client should be initialized"); - } - - stream_ = workerClient_.Process(deadline: DateTime.UtcNow + taskData.Options.MaxDuration, - cancellationToken: cancellationToken); - - if (stream_ is null) - { - throw new ArmoniKException($"Failed to recuperate Stream for {taskData.TaskId}"); - } - - Pipe = new GrpcAsyncPipe(stream_.ResponseAsync, - stream_.RequestStream); - } - - public IAsyncPipe? Pipe { get; private set; } - public async Task Init(CancellationToken cancellationToken) { @@ -152,9 +127,20 @@ public async Task Check(HealthCheckTag tag) } public void Dispose() + => GC.SuppressFinalize(this); + + public async Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken) { - stream_?.Dispose(); - GC.SuppressFinalize(this); + if (workerClient_ == null) + { + throw new ArmoniKException("Worker client should be initialized"); + } + + return await workerClient_.ProcessAsync(request, + deadline: DateTime.UtcNow + duration, + cancellationToken: cancellationToken).ConfigureAwait(false); } private Task CheckWorker(CancellationToken cancellationToken) diff --git a/Common/src/Utils/GrpcAsyncPipe.cs b/Common/src/Utils/GrpcAsyncPipe.cs deleted file mode 100644 index 4d607e39b..000000000 --- a/Common/src/Utils/GrpcAsyncPipe.cs +++ /dev/null @@ -1,58 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -using Grpc.Core; - -namespace ArmoniK.Core.Common.Utils; - -internal class GrpcAsyncPipe : IAsyncPipe -{ - private readonly Task response_; - private readonly IClientStreamWriter writer_; - - public GrpcAsyncPipe(Task response, - IClientStreamWriter writer) - { - response_ = response; - writer_ = writer; - writer_.WriteOptions = new WriteOptions(WriteFlags.NoCompress); - } - - - public async Task ReadAsync(CancellationToken cancellationToken) - => await response_.WaitAsync(cancellationToken) - .ConfigureAwait(false); - - public Task WriteAsync(TWriteMessage message) - => writer_.WriteAsync(message); - - public async Task WriteAsync(IEnumerable message) - { - foreach (var writeMessage in message) - { - await writer_.WriteAsync(writeMessage) - .ConfigureAwait(false); - } - } - - public Task CompleteAsync() - => writer_.CompleteAsync(); -} diff --git a/Common/src/Utils/IAsyncPipe.cs b/Common/src/Utils/IAsyncPipe.cs deleted file mode 100644 index a2cfe6418..000000000 --- a/Common/src/Utils/IAsyncPipe.cs +++ /dev/null @@ -1,33 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace ArmoniK.Core.Common.Utils; - -public interface IAsyncPipe -{ - Task ReadAsync(CancellationToken cancellationToken); - - Task WriteAsync(TWriteMessage message); - - Task WriteAsync(IEnumerable message); - - Task CompleteAsync(); -} diff --git a/Common/src/gRPC/Services/Agent.cs b/Common/src/gRPC/Services/Agent.cs index e9ebb7c0e..4b42b9dec 100644 --- a/Common/src/gRPC/Services/Agent.cs +++ b/Common/src/gRPC/Services/Agent.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Channels; @@ -32,15 +33,12 @@ using ArmoniK.Core.Common.Storage; using ArmoniK.Utils; -using Google.Protobuf; - using Grpc.Core; using Microsoft.Extensions.Logging; using static Google.Protobuf.WellKnownTypes.Timestamp; -using Result = ArmoniK.Api.gRPC.V1.Agent.Result; using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; namespace ArmoniK.Core.Common.gRPC.Services; @@ -51,6 +49,7 @@ namespace ArmoniK.Core.Common.gRPC.Services; public sealed class Agent : IAgent { private readonly List createdTasks_; + private readonly string folder_; private readonly ILogger logger_; private readonly IObjectStorage objectStorage_; private readonly IPushQueueStorage pushQueueStorage_; @@ -72,6 +71,7 @@ public sealed class Agent : IAgent /// Interface to manage task states /// Data of the session /// Data of the task + /// /// Token send to the worker to identify the running task /// Logger used to produce logs for this class public Agent(ISubmitter submitter, @@ -81,6 +81,7 @@ public Agent(ISubmitter submitter, ITaskTable taskTable, SessionData sessionData, TaskData taskData, + string folder, string token, ILogger logger) { @@ -94,6 +95,7 @@ public Agent(ISubmitter submitter, sentResults_ = new List(); sessionData_ = sessionData; taskData_ = taskData; + folder_ = folder; token_ = token; } @@ -299,286 +301,19 @@ await taskRequestsChannel.Writer.WriteAsync(new TaskRequest(request.InitTask.Hea return new CreateTaskReply(); } - /// - public async Task GetCommonData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken) - { - using var _ = logger_.BeginNamedScope(nameof(GetCommonData), - ("taskId", taskData_.TaskId), - ("sessionId", sessionData_.SessionId)); - if (string.IsNullOrEmpty(request.CommunicationToken)) - { - await responseStream.WriteAsync(new DataReply - { - CommunicationToken = request.CommunicationToken, - Error = "Missing communication token", - }, - cancellationToken) - .ConfigureAwait(false); - return; - } - - if (request.CommunicationToken != token_) - { - await responseStream.WriteAsync(new DataReply - { - CommunicationToken = request.CommunicationToken, - Error = "Wrong communication token", - }, - cancellationToken) - .ConfigureAwait(false); - return; - } - - await responseStream.WriteAsync(new DataReply - { - Error = "Common data are not supported yet", - }, - cancellationToken) - .ConfigureAwait(false); - } - - /// - public async Task GetDirectData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken) - { - using var _ = logger_.BeginNamedScope(nameof(GetDirectData), - ("taskId", taskData_.TaskId), - ("sessionId", sessionData_.SessionId)); - - if (string.IsNullOrEmpty(request.CommunicationToken)) - { - await responseStream.WriteAsync(new DataReply - { - CommunicationToken = request.CommunicationToken, - Error = "Missing communication token", - }, - cancellationToken) - .ConfigureAwait(false); - return; - } - - if (request.CommunicationToken != token_) - { - await responseStream.WriteAsync(new DataReply - { - CommunicationToken = request.CommunicationToken, - Error = "Wrong communication token", - }, - cancellationToken) - .ConfigureAwait(false); - return; - } - - await responseStream.WriteAsync(new DataReply - { - Error = "Direct data are not supported yet", - }, - cancellationToken) - .ConfigureAwait(false); - } - - /// - public async Task GetResourceData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken) - { - using var _ = logger_.BeginNamedScope(nameof(GetResourceData), - ("taskId", taskData_.TaskId), - ("sessionId", sessionData_.SessionId)); - - if (string.IsNullOrEmpty(request.CommunicationToken)) - { - await responseStream.WriteAsync(new DataReply - { - CommunicationToken = request.CommunicationToken, - Error = "Missing communication token", - }, - cancellationToken) - .ConfigureAwait(false); - return; - } - - if (request.CommunicationToken != token_) - { - await responseStream.WriteAsync(new DataReply - { - CommunicationToken = request.CommunicationToken, - Error = "Wrong communication token", - }, - cancellationToken) - .ConfigureAwait(false); - return; - } - - try - { - await foreach (var data in objectStorage_.GetValuesAsync(request.Key, - cancellationToken) - .ConfigureAwait(false)) - { - await responseStream.WriteAsync(new DataReply - { - Data = new DataChunk - { - Data = UnsafeByteOperations.UnsafeWrap(data), - }, - }, - cancellationToken) - .ConfigureAwait(false); - } - - await responseStream.WriteAsync(new DataReply - { - Data = new DataChunk - { - DataComplete = true, - }, - }, - cancellationToken) - .ConfigureAwait(false); - } - catch (ObjectDataNotFoundException) - { - await responseStream.WriteAsync(new DataReply - { - Init = new DataReply.Types.Init - { - Key = request.Key, - Error = "Key not found", - }, - }, - cancellationToken) - .ConfigureAwait(false); - } - } - - /// - [SuppressMessage("Usage", - "CA2208:Instantiate argument exceptions correctly", - Justification = "")] - public async Task SendResult(IAsyncStreamReader requestStream, - CancellationToken cancellationToken) - { - using var _ = logger_.BeginNamedScope(nameof(SendResult), - ("taskId", taskData_.TaskId), - ("sessionId", sessionData_.SessionId)); - - var completionTask = Task.CompletedTask; - var fsmResult = new ProcessReplyResultStateMachine(logger_); - var chunksChannel = Channel.CreateUnbounded>(new UnboundedChannelOptions - { - SingleWriter = true, - SingleReader = true, - }); - - await foreach (var request in requestStream.ReadAllAsync(cancellationToken) - .ConfigureAwait(false)) - { - if (string.IsNullOrEmpty(request.CommunicationToken)) - { - return new ResultReply - { - CommunicationToken = request.CommunicationToken, - Error = "Missing communication token", - }; - } - - if (request.CommunicationToken != token_) - { - return new ResultReply - { - CommunicationToken = request.CommunicationToken, - Error = "Wrong communication token", - }; - } - - switch (request.TypeCase) - { - case Result.TypeOneofCase.Init: - switch (request.Init.TypeCase) - { - case InitKeyedDataStream.TypeOneofCase.Key: - fsmResult.InitKey(); - completionTask = Task.Run(async () => await objectStorage_.AddOrUpdateAsync(request.Init.Key, - chunksChannel.Reader.ReadAllAsync(cancellationToken), - cancellationToken) - .ConfigureAwait(false), - cancellationToken); - sentResults_.Add(request.Init.Key); - break; - case InitKeyedDataStream.TypeOneofCase.LastResult: - fsmResult.CompleteRequest(); - - try - { - await completionTask.WaitAsync(cancellationToken) - .ConfigureAwait(false); - return new ResultReply - { - Ok = new Empty(), - }; - } - catch (Exception e) - { - logger_.LogWarning(e, - "Error while receiving results"); - return new ResultReply - { - Error = "Error while receiving results", - }; - } - - case InitKeyedDataStream.TypeOneofCase.None: - default: - throw new ArgumentOutOfRangeException(nameof(request.Init.TypeCase)); - } - - break; - case Result.TypeOneofCase.Data: - switch (request.Data.TypeCase) - { - case DataChunk.TypeOneofCase.Data: - fsmResult.AddDataChunk(); - await chunksChannel.Writer.WriteAsync(request.Data.Data.Memory, - cancellationToken) - .ConfigureAwait(false); - break; - case DataChunk.TypeOneofCase.DataComplete: - fsmResult.CompleteData(); - chunksChannel.Writer.Complete(); - break; - - case DataChunk.TypeOneofCase.None: - default: - throw new ArgumentOutOfRangeException(nameof(request.Data.TypeCase)); - } - - break; - case Result.TypeOneofCase.None: - default: - throw new ArgumentOutOfRangeException(nameof(request.TypeCase)); - } - } - - return new ResultReply(); - } - /// public async Task CreateResultsMetaData(CreateResultsMetaDataRequest request, CancellationToken cancellationToken) { - var results = request.Results.Select(rc => new Storage.Result(request.SessionId, - Guid.NewGuid() - .ToString(), - rc.Name, - "", - ResultStatus.Created, - new List(), - DateTime.UtcNow, - Array.Empty())) + var results = request.Results.Select(rc => new Result(request.SessionId, + Guid.NewGuid() + .ToString(), + rc.Name, + "", + ResultStatus.Created, + new List(), + DateTime.UtcNow, + Array.Empty())) .ToList(); await resultTable_.Create(results, @@ -654,60 +389,21 @@ await TaskLifeCycleHelper.CreateTasks(taskTable_, }; } - /// - public async Task UploadResultData(IAsyncStreamReader requestStream, - CancellationToken cancellationToken) - { - if (!await requestStream.MoveNext(cancellationToken) - .ConfigureAwait(false)) - { - throw new RpcException(new Status(StatusCode.FailedPrecondition, - "Missing result metadata"), - "Missing result metadata"); - } - - var current = requestStream.Current; - - if (current.TypeCase != UploadResultDataRequest.TypeOneofCase.Id) - { - throw new RpcException(new Status(StatusCode.InvalidArgument, - "Message should be an Id"), - "Message should be an Id"); - } - - var id = current.Id; - - - await objectStorage_.AddOrUpdateAsync(id.ResultId, - requestStream.ReadAllAsync(cancellationToken) - .Select(r => r.DataChunk.Memory), - cancellationToken) - .ConfigureAwait(false); - - sentResults_.Add(id.ResultId); - - return new UploadResultDataResponse - { - ResultId = id.ResultId, - CommunicationToken = token_, - }; - } - /// public async Task CreateResults(CreateResultsRequest request, CancellationToken cancellationToken) { var results = await request.Results.Select(async rc => { - var result = new Storage.Result(request.SessionId, - Guid.NewGuid() - .ToString(), - rc.Name, - "", - ResultStatus.Created, - new List(), - DateTime.UtcNow, - Array.Empty()); + var result = new Result(request.SessionId, + Guid.NewGuid() + .ToString(), + rc.Name, + "", + ResultStatus.Created, + new List(), + DateTime.UtcNow, + Array.Empty()); await objectStorage_.AddOrUpdateAsync(result.ResultId, new List> @@ -745,8 +441,166 @@ await resultTable_.Create(results, }; } + public async Task NotifyResultData(NotifyResultDataRequest request, + CancellationToken cancellationToken) + { + if (string.IsNullOrEmpty(request.CommunicationToken)) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Missing communication token"), + "Missing communication token"); + } + + if (request.CommunicationToken != token_) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Wrong communication token"), + "Wrong communication token"); + } + + foreach (var result in request.Ids) + { + await using var fs = new FileStream(Path.Combine(folder_, + result.ResultId), + FileMode.OpenOrCreate); + using var r = new BinaryReader(fs); + var channel = Channel.CreateUnbounded>(); + + var add = objectStorage_.AddOrUpdateAsync(result.ResultId, + channel.Reader.ReadAllAsync(cancellationToken), + cancellationToken); + + int read; + do + { + var buffer = new byte[PayloadConfiguration.MaxChunkSize]; + read = r.Read(buffer, + 0, + PayloadConfiguration.MaxChunkSize); + await channel.Writer.WriteAsync(buffer.AsMemory(0, + read), + cancellationToken) + .ConfigureAwait(false); + } while (read != 0); + + channel.Writer.Complete(); + + await add.ConfigureAwait(false); + sentResults_.Add(result.ResultId); + } + + return new NotifyResultDataResponse + { + ResultIds = + { + request.Ids.Select(identifier => identifier.ResultId), + }, + }; + } + /// public void Dispose() { } + + /// + public async Task GetResourceData(DataRequest request, + CancellationToken cancellationToken) + { + using var _ = logger_.BeginNamedScope(nameof(GetResourceData), + ("taskId", taskData_.TaskId), + ("sessionId", sessionData_.SessionId)); + + if (string.IsNullOrEmpty(request.CommunicationToken)) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Missing communication token"), + "Missing communication token"); + } + + if (request.CommunicationToken != token_) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Wrong communication token"), + "Wrong communication token"); + } + + try + { + await using (var fs = new FileStream(Path.Combine(folder_, + request.ResultId), + FileMode.OpenOrCreate)) + { + await using var w = new BinaryWriter(fs); + await foreach (var chunk in objectStorage_.GetValuesAsync(request.ResultId, + cancellationToken) + .ConfigureAwait(false)) + { + w.Write(chunk); + } + } + + + return new DataResponse + { + ResultId = request.ResultId, + }; + } + catch (ObjectDataNotFoundException) + { + throw new RpcException(new Status(StatusCode.NotFound, + "Data not found"), + "Data not found"); + } + } + + /// + public Task GetCommonData(DataRequest request, + CancellationToken cancellationToken) + { + using var _ = logger_.BeginNamedScope(nameof(GetCommonData), + ("taskId", taskData_.TaskId), + ("sessionId", sessionData_.SessionId)); + + if (string.IsNullOrEmpty(request.CommunicationToken)) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Missing communication token"), + "Missing communication token"); + } + + if (request.CommunicationToken != token_) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Wrong communication token"), + "Wrong communication token"); + } + + throw new NotImplementedException("Common data are not implemented yet"); + } + + /// + public Task GetDirectData(DataRequest request, + CancellationToken cancellationToken) + { + using var _ = logger_.BeginNamedScope(nameof(GetDirectData), + ("taskId", taskData_.TaskId), + ("sessionId", sessionData_.SessionId)); + + if (string.IsNullOrEmpty(request.CommunicationToken)) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Missing communication token"), + "Missing communication token"); + } + + if (request.CommunicationToken != token_) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, + "Wrong communication token"), + "Wrong communication token"); + } + + throw new NotImplementedException("Direct data are not implemented yet"); + } } diff --git a/Common/src/gRPC/Services/GrpcAgentService.cs b/Common/src/gRPC/Services/GrpcAgentService.cs index 9182a864d..0c40422e1 100644 --- a/Common/src/gRPC/Services/GrpcAgentService.cs +++ b/Common/src/gRPC/Services/GrpcAgentService.cs @@ -57,71 +57,58 @@ public override async Task CreateTask(IAsyncStreamReader responseStream, - ServerCallContext context) + public override async Task GetCommonData(DataRequest request, + ServerCallContext context) { if (agent_ != null) { - await agent_.GetCommonData(request, - responseStream, - context.CancellationToken) - .ConfigureAwait(false); - } - else - { - await responseStream.WriteAsync(new DataReply - { - Error = "No task is accepting request", - }) - .ConfigureAwait(false); + return await agent_.GetCommonData(request, + context.CancellationToken) + .ConfigureAwait(false); } + + throw new RpcException(new Status(StatusCode.Unavailable, + "No task is accepting request"), + "No task is accepting request"); } - public override async Task GetResourceData(DataRequest request, - IServerStreamWriter responseStream, - ServerCallContext context) + public override async Task GetResourceData(DataRequest request, + ServerCallContext context) { if (agent_ != null) { - await agent_.GetResourceData(request, - responseStream, - context.CancellationToken) - .ConfigureAwait(false); - } - else - { - await responseStream.WriteAsync(new DataReply - { - Error = "No task is accepting request", - }) - .ConfigureAwait(false); + return await agent_.GetResourceData(request, + context.CancellationToken) + .ConfigureAwait(false); } + + throw new RpcException(new Status(StatusCode.Unavailable, + "No task is accepting request"), + "No task is accepting request"); } - public override async Task SendResult(IAsyncStreamReader requestStream, - ServerCallContext context) + public override async Task GetDirectData(DataRequest request, + ServerCallContext context) { if (agent_ != null) { - return await agent_.SendResult(requestStream, - context.CancellationToken) + return await agent_.GetDirectData(request, + context.CancellationToken) .ConfigureAwait(false); } - return new ResultReply - { - Error = "No task is accepting request", - }; + throw new RpcException(new Status(StatusCode.Unavailable, + "No task is accepting request"), + "No task is accepting request"); } - public override async Task CreateResultsMetaData(CreateResultsMetaDataRequest request, - ServerCallContext context) + public override async Task NotifyResultData(NotifyResultDataRequest request, + ServerCallContext context) { if (agent_ != null) { - return await agent_.CreateResultsMetaData(request, - context.CancellationToken) + return await agent_.NotifyResultData(request, + context.CancellationToken) .ConfigureAwait(false); } @@ -130,13 +117,13 @@ public override async Task CreateResultsMetaData( "No task is accepting request"); } - public override async Task SubmitTasks(SubmitTasksRequest request, - ServerCallContext context) + public override async Task CreateResultsMetaData(CreateResultsMetaDataRequest request, + ServerCallContext context) { if (agent_ != null) { - return await agent_.SubmitTasks(request, - context.CancellationToken) + return await agent_.CreateResultsMetaData(request, + context.CancellationToken) .ConfigureAwait(false); } @@ -145,13 +132,13 @@ public override async Task SubmitTasks(SubmitTasksRequest r "No task is accepting request"); } - public override async Task UploadResultData(IAsyncStreamReader requestStream, - ServerCallContext context) + public override async Task SubmitTasks(SubmitTasksRequest request, + ServerCallContext context) { if (agent_ != null) { - return await agent_.UploadResultData(requestStream, - context.CancellationToken) + return await agent_.SubmitTasks(request, + context.CancellationToken) .ConfigureAwait(false); } diff --git a/Common/src/gRPC/Services/IAgent.cs b/Common/src/gRPC/Services/IAgent.cs index 8395fdb20..3b63c0a7f 100644 --- a/Common/src/gRPC/Services/IAgent.cs +++ b/Common/src/gRPC/Services/IAgent.cs @@ -51,54 +51,37 @@ Task CreateTask(IAsyncStreamReader requestSt CancellationToken cancellationToken); /// - /// Get Common data from data storage + /// Get Common data from data storage as file in shared folder /// /// Request specifying the data to retrieve - /// Response containing the data that will be sent to the worker /// Token used to cancel the execution of the method /// - /// Task representing the asynchronous execution of the method + /// Response to send to the worker /// - Task GetCommonData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken); + Task GetCommonData(DataRequest request, + CancellationToken cancellationToken); /// - /// Get Direct data from user + /// Get Direct data from user as file in shared folder /// /// Request specifying the data to retrieve - /// Response containing the data that will be sent to the worker /// Token used to cancel the execution of the method /// - /// Task representing the asynchronous execution of the method + /// Response to send to the worker /// - Task GetDirectData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken); + Task GetDirectData(DataRequest request, + CancellationToken cancellationToken); /// - /// Get Resource data from data storage + /// Get Resource data from data storage as file in shared folder /// /// Request specifying the data to retrieve - /// Response containing the data that will be sent to the worker /// Token used to cancel the execution of the method /// - /// Task representing the asynchronous execution of the method + /// Response to send to the worker /// - Task GetResourceData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken); - - /// - /// Put the results created in the task into data storage and mark them as available in data table - /// - /// Requests containing the results - /// Token used to cancel the execution of the method - /// - /// Reply sent to the worker describing the status of the execution of the received requests - /// - Task SendResult(IAsyncStreamReader requestStream, - CancellationToken cancellationToken); + Task GetResourceData(DataRequest request, + CancellationToken cancellationToken); /// /// Create results metadata @@ -123,24 +106,24 @@ Task SubmitTasks(SubmitTasksRequest request, CancellationToken cancellationToken); /// - /// Associate data to an existing result + /// Create a result (with data and metadata) /// - /// Requests containing the result data + /// Requests containing the result to create and the data /// Token used to cancel the execution of the method /// - /// Reply sent to the worker describing the status of the execution of the received requests + /// Reply sent to the worker with the id of the created result /// - Task UploadResultData(IAsyncStreamReader requestStream, - CancellationToken cancellationToken); + Task CreateResults(CreateResultsRequest request, + CancellationToken cancellationToken); /// - /// Create a result (with data and metadata) + /// Put the results created as a file in the task into object storage /// - /// Requests containing the result to create and the data + /// Requests containing the results /// Token used to cancel the execution of the method /// - /// Reply sent to the worker with the id of the created result + /// Reply sent to the worker describing the status of the execution of the received requests /// - Task CreateResults(CreateResultsRequest request, - CancellationToken cancellationToken); + Task NotifyResultData(NotifyResultDataRequest request, + CancellationToken cancellationToken); } diff --git a/Common/tests/FullIntegration/FullyIntegratedTest.cs b/Common/tests/FullIntegration/FullyIntegratedTest.cs deleted file mode 100644 index 638f543a9..000000000 --- a/Common/tests/FullIntegration/FullyIntegratedTest.cs +++ /dev/null @@ -1,213 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Linq; -using System.Threading.Tasks; - -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Submitter; -using ArmoniK.Core.Common.Exceptions; -using ArmoniK.Core.Common.Stream.Worker; -using ArmoniK.Core.Common.Tests.Helpers; - -using Google.Protobuf; -using Google.Protobuf.WellKnownTypes; - -using Moq; - -using NUnit.Framework; - -using Empty = ArmoniK.Api.gRPC.V1.Empty; -using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; - -namespace ArmoniK.Core.Common.Tests.FullIntegration; - -[TestFixture] -public class FullyIntegratedTest -{ - [SetUp] - public void Setup() - { - } - - [TearDown] - public async Task TearDown() - { - if (helper_ != null) - { - await helper_.StopServer() - .ConfigureAwait(false); - } - - helper_ = null; - } - - private GrpcSubmitterServiceHelper? helper_; - - - [Test] - [Ignore("Need to be reworked")] - public async Task GetServiceConfigurationShouldSucceed() - { - var mockStreamHandler = new Mock(); - using var testServiceProvider = new TestPollingAgentProvider(mockStreamHandler.Object); - - helper_ = new GrpcSubmitterServiceHelper(testServiceProvider.Submitter); - var client = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(await helper_.CreateChannel() - .ConfigureAwait(false)); - - var response = client.GetServiceConfiguration(new Empty()); - - Assert.AreEqual(PayloadConfiguration.MaxChunkSize, - response.DataChunkMaxSize); - } - - [Test] - [Ignore("Need to be reworked")] - public async Task FullRunShouldSucceed() - { - var mockStreamHandler = new WorkerStreamHandlerFullTest(); - - using var testServiceProvider = new TestPollingAgentProvider(mockStreamHandler); - helper_ = new GrpcSubmitterServiceHelper(testServiceProvider.Submitter); - var client = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(await helper_.CreateChannel() - .ConfigureAwait(false)); - const string sessionId = "MySession"; - const string taskId = "MyTask"; - - var taskOptions = new TaskOptions - { - MaxDuration = Duration.FromTimeSpan(TimeSpan.FromSeconds(10)), - MaxRetries = 4, - Priority = 2, - }; - client.CreateSession(new CreateSessionRequest - { - DefaultTaskOption = taskOptions, - }); - - var taskRequest = new TaskRequest - { - Payload = ByteString.CopyFromUtf8("taskPayload"), - }; - - taskRequest.ExpectedOutputKeys.Add(taskId); - - var taskCreationReply = client.CreateSmallTasks(new CreateSmallTaskRequest - { - SessionId = sessionId, - TaskOptions = taskOptions, - TaskRequests = - { - new[] - { - taskRequest, - }, - }, - }); - Assert.AreEqual(CreateTaskReply.ResponseOneofCase.CreationStatusList, - taskCreationReply.ResponseCase); - - var taskWaitForCompletion = client.WaitForCompletion(new WaitRequest - { - Filter = new TaskFilter - { - Task = new TaskFilter.Types.IdsRequest - { - Ids = - { - taskId, - }, - }, - }, - }); - - var taskStatus = taskWaitForCompletion.Values.Single(); - Assert.AreEqual(1, - taskStatus.Count); - Assert.AreEqual(TaskStatus.Completed, - taskStatus.Status); - } - - [Test] - [Ignore("Unstable error management in Pollster")] - public async Task TaskRetryShouldSucceed() - { - var mockStreamHandler = new WorkerStreamHandlerErrorRetryTest(new ArmoniKException()); - - using var testServiceProvider = new TestPollingAgentProvider(mockStreamHandler); - helper_ = new GrpcSubmitterServiceHelper(testServiceProvider.Submitter); - var client = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(await helper_.CreateChannel() - .ConfigureAwait(false)); - const string sessionId = "MySession"; - const string taskId = "MyTask"; - - var taskOptions = new TaskOptions - { - MaxDuration = Duration.FromTimeSpan(TimeSpan.FromSeconds(10)), - MaxRetries = 4, - Priority = 2, - }; - client.CreateSession(new CreateSessionRequest - { - DefaultTaskOption = taskOptions, - }); - - var taskRequest = new TaskRequest - { - Payload = ByteString.CopyFromUtf8("taskPayload"), - }; - - taskRequest.ExpectedOutputKeys.Add(taskId); - - var taskCreationReply = client.CreateSmallTasks(new CreateSmallTaskRequest - { - SessionId = sessionId, - TaskOptions = taskOptions, - TaskRequests = - { - new[] - { - taskRequest, - }, - }, - }); - Assert.AreEqual(CreateTaskReply.ResponseOneofCase.CreationStatusList, - taskCreationReply.ResponseCase); - - var taskWaitForCompletion = client.WaitForCompletion(new WaitRequest - { - Filter = new TaskFilter - { - Task = new TaskFilter.Types.IdsRequest - { - Ids = - { - taskId, - }, - }, - }, - }); - - var taskStatus = taskWaitForCompletion.Values.Single(); - Assert.AreEqual(1, - taskStatus.Count); - Assert.AreEqual(TaskStatus.Completed, - taskStatus.Status); - } -} diff --git a/Common/tests/FullIntegration/WorkerStreamHandlerBase.cs b/Common/tests/FullIntegration/WorkerStreamHandlerBase.cs deleted file mode 100644 index 0185f32ae..000000000 --- a/Common/tests/FullIntegration/WorkerStreamHandlerBase.cs +++ /dev/null @@ -1,65 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Api.gRPC.V1.Worker; -using ArmoniK.Core.Base.DataStructures; -using ArmoniK.Core.Common.Storage; -using ArmoniK.Core.Common.Stream.Worker; -using ArmoniK.Core.Common.Tests.Helpers; -using ArmoniK.Core.Common.Utils; - -using Microsoft.Extensions.Diagnostics.HealthChecks; - -namespace ArmoniK.Core.Common.Tests.FullIntegration; - -public abstract class WorkerStreamHandlerBase : IWorkerStreamHandler -{ - protected readonly ChannelAsyncPipe ChannelAsyncPipe; - protected readonly List TaskList; - - protected WorkerStreamHandlerBase() - { - TaskList = new List(); - ChannelAsyncPipe = new ChannelAsyncPipe(new ProcessReply()); - } - - public Task Check(HealthCheckTag tag) - => Task.FromResult(HealthCheckResult.Healthy()); - - public Task Init(CancellationToken cancellationToken) - => Task.CompletedTask; - - public void Dispose() - { - TaskList.ForEach(task => task.Dispose()); - GC.SuppressFinalize(this); - } - - public abstract void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken); - - public IAsyncPipe Pipe - => ChannelAsyncPipe; - - public Queue WorkerReturn() - => throw new NotImplementedException(); -} diff --git a/Common/tests/FullIntegration/WorkerStreamHandlerErrorRetryTest.cs b/Common/tests/FullIntegration/WorkerStreamHandlerErrorRetryTest.cs deleted file mode 100644 index f077f8014..000000000 --- a/Common/tests/FullIntegration/WorkerStreamHandlerErrorRetryTest.cs +++ /dev/null @@ -1,62 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; -using ArmoniK.Core.Common.Storage; - -using Output = ArmoniK.Api.gRPC.V1.Output; - -namespace ArmoniK.Core.Common.Tests.FullIntegration; - -internal class WorkerStreamHandlerErrorRetryTest : WorkerStreamHandlerBase -{ - private readonly Exception exception_; - - public WorkerStreamHandlerErrorRetryTest(Exception exception) - => exception_ = exception; - - public override void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) - { - if (!taskData.TaskId.Contains("###")) - { - throw exception_; - } - - var task = new Task(async () => - { - var request = await ChannelAsyncPipe.Reverse.ReadAsync(cancellationToken) - .ConfigureAwait(false); - - await ChannelAsyncPipe.Reverse.WriteAsync(new ProcessReply - { - Output = new Output - { - Ok = new Empty(), - }, - }) - .ConfigureAwait(false); - }); - TaskList.Add(task); - task.Start(); - } -} diff --git a/Common/tests/FullIntegration/WorkerStreamHandlerFullTest.cs b/Common/tests/FullIntegration/WorkerStreamHandlerFullTest.cs deleted file mode 100644 index 94406ef17..000000000 --- a/Common/tests/FullIntegration/WorkerStreamHandlerFullTest.cs +++ /dev/null @@ -1,63 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; -using ArmoniK.Core.Common.Storage; - -using Output = ArmoniK.Api.gRPC.V1.Output; - -namespace ArmoniK.Core.Common.Tests.FullIntegration; - -public class WorkerStreamHandlerFullTest : WorkerStreamHandlerBase -{ - public override void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) - { - Console.WriteLine(taskData); - - var task = new Task(async () => - { - //var requests = pipe_.Reverse.Reader.GetAsyncEnumerator(cancellationToken); - //while (await requests.MoveNextAsync().ConfigureAwait(false)) - //{ - // Console.WriteLine(requests.Current); - //} - - Console.WriteLine(await ChannelAsyncPipe.Reverse.ReadAsync(cancellationToken) - .ConfigureAwait(false)); - - await ChannelAsyncPipe.Reverse.WriteAsync(new ProcessReply - { - Output = new Output - { - Ok = new Empty(), - }, - }) - .ConfigureAwait(false); - - await ChannelAsyncPipe.Reverse.CompleteAsync() - .ConfigureAwait(false); - }); - TaskList.Add(task); - task.Start(); - } -} diff --git a/Common/tests/Helpers/ChannelAsyncPipe.cs b/Common/tests/Helpers/ChannelAsyncPipe.cs deleted file mode 100644 index 9f655110a..000000000 --- a/Common/tests/Helpers/ChannelAsyncPipe.cs +++ /dev/null @@ -1,73 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System.Collections.Generic; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; - -using ArmoniK.Core.Common.Utils; - -namespace ArmoniK.Core.Common.Tests.Helpers; - -public class ChannelAsyncPipe : IAsyncPipe - where TWriteMessage : new() - where TReadMessage : new() -{ - private readonly TReadMessage message_; - private readonly Channel readerChannel_ = Channel.CreateUnbounded(); - private readonly Channel writerChannel_ = Channel.CreateUnbounded(); - - public ChannelAsyncPipe(TReadMessage message) - => message_ = message; - - private ChannelAsyncPipe(Channel readerChannel, - Channel writerChannel, - TReadMessage message) - { - readerChannel_ = readerChannel; - writerChannel_ = writerChannel; - message_ = message; - } - - public IAsyncPipe Reverse - => new ChannelAsyncPipe(writerChannel_, - readerChannel_, - new TWriteMessage()); - - public Task ReadAsync(CancellationToken cancellationToken) - => Task.FromResult(message_); - - public async Task WriteAsync(TWriteMessage message) - => await writerChannel_.Writer.WriteAsync(message) - .ConfigureAwait(false); - - public async Task WriteAsync(IEnumerable messages) - { - foreach (var message in messages) - { - await writerChannel_.Writer.WriteAsync(message) - .ConfigureAwait(false); - } - } - - public Task CompleteAsync() - { - writerChannel_.Writer.Complete(); - return Task.CompletedTask; - } -} diff --git a/Common/tests/Helpers/ExceptionAsyncPipe.cs b/Common/tests/Helpers/ExceptionAsyncPipe.cs deleted file mode 100644 index c22125ccc..000000000 --- a/Common/tests/Helpers/ExceptionAsyncPipe.cs +++ /dev/null @@ -1,53 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Api.gRPC.V1.Worker; -using ArmoniK.Core.Common.Utils; - -namespace ArmoniK.Core.Common.Tests.Helpers; - -public class ExceptionAsyncPipe : IAsyncPipe - where T : Exception, new() -{ - private readonly int delay_; - - public ExceptionAsyncPipe(int delay) - => delay_ = delay; - - public async Task ReadAsync(CancellationToken cancellationToken) - { - await Task.Delay(TimeSpan.FromMilliseconds(delay_), - cancellationToken) - .ConfigureAwait(false); - cancellationToken.ThrowIfCancellationRequested(); - throw new T(); - } - - public Task WriteAsync(ProcessRequest message) - => Task.CompletedTask; - - public Task WriteAsync(IEnumerable message) - => Task.CompletedTask; - - public Task CompleteAsync() - => Task.CompletedTask; -} diff --git a/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs b/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs index f95153ed3..10369436b 100644 --- a/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs +++ b/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs @@ -21,9 +21,7 @@ using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base.DataStructures; -using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Stream.Worker; -using ArmoniK.Core.Common.Utils; using Microsoft.Extensions.Diagnostics.HealthChecks; @@ -47,9 +45,14 @@ public void Dispose() { } - public IAsyncPipe? Pipe { get; private set; } - - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) - => Pipe = new ExceptionAsyncPipe(delay_); + public async Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken) + { + await Task.Delay(TimeSpan.FromMilliseconds(delay_), + cancellationToken) + .ConfigureAwait(false); + cancellationToken.ThrowIfCancellationRequested(); + throw new T(); + } } diff --git a/Common/tests/Helpers/SimpleAgent.cs b/Common/tests/Helpers/SimpleAgent.cs index dc3b41da8..8ffa9e716 100644 --- a/Common/tests/Helpers/SimpleAgent.cs +++ b/Common/tests/Helpers/SimpleAgent.cs @@ -35,25 +35,18 @@ public Task CreateTask(IAsyncStreamReader re CancellationToken cancellationToken) => Task.FromResult(new CreateTaskReply()); - public Task GetCommonData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken) + public Task GetCommonData(DataRequest request, + CancellationToken cancellationToken) => throw new NotImplementedException(); - public Task GetDirectData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken) + public Task GetDirectData(DataRequest request, + CancellationToken cancellationToken) => throw new NotImplementedException(); - public Task GetResourceData(DataRequest request, - IServerStreamWriter responseStream, - CancellationToken cancellationToken) + public Task GetResourceData(DataRequest request, + CancellationToken cancellationToken) => throw new NotImplementedException(); - public Task SendResult(IAsyncStreamReader requestStream, - CancellationToken cancellationToken) - => Task.FromResult(new ResultReply()); - public Task CreateResultsMetaData(CreateResultsMetaDataRequest request, CancellationToken cancellationToken) => Task.FromResult(new CreateResultsMetaDataResponse()); @@ -62,14 +55,15 @@ public Task SubmitTasks(SubmitTasksRequest request, CancellationToken cancellationToken) => Task.FromResult(new SubmitTasksResponse()); - public Task UploadResultData(IAsyncStreamReader requestStream, - CancellationToken cancellationToken) - => Task.FromResult(new UploadResultDataResponse()); public Task CreateResults(CreateResultsRequest request, CancellationToken cancellationToken) => Task.FromResult(new CreateResultsResponse()); + public Task NotifyResultData(NotifyResultDataRequest request, + CancellationToken cancellationToken) + => Task.FromResult(new NotifyResultDataResponse()); + public void Dispose() => GC.SuppressFinalize(this); } diff --git a/Common/tests/Helpers/SimpleAgentHandler.cs b/Common/tests/Helpers/SimpleAgentHandler.cs index 48c55258a..ea9897723 100644 --- a/Common/tests/Helpers/SimpleAgentHandler.cs +++ b/Common/tests/Helpers/SimpleAgentHandler.cs @@ -37,6 +37,7 @@ public Task Start(string token, ILogger logger, SessionData sessionData, TaskData taskData, + string folder, CancellationToken cancellationToken) { Agent = new SimpleAgent(); diff --git a/Common/tests/Helpers/SimpleWorkerStreamHandler.cs b/Common/tests/Helpers/SimpleWorkerStreamHandler.cs index c57742a32..a7a45e7b6 100644 --- a/Common/tests/Helpers/SimpleWorkerStreamHandler.cs +++ b/Common/tests/Helpers/SimpleWorkerStreamHandler.cs @@ -22,14 +22,10 @@ using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base.DataStructures; -using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Stream.Worker; -using ArmoniK.Core.Common.Utils; using Microsoft.Extensions.Diagnostics.HealthChecks; -using Output = ArmoniK.Api.gRPC.V1.Output; - namespace ArmoniK.Core.Common.Tests.Helpers; public class SimpleWorkerStreamHandler : IWorkerStreamHandler @@ -43,16 +39,14 @@ public Task Init(CancellationToken cancellationToken) public void Dispose() => GC.SuppressFinalize(this); - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) - => Pipe = new ChannelAsyncPipe(new ProcessReply - { - CommunicationToken = "", - Output = new Output - { - Ok = new Empty(), - }, - }); - - public IAsyncPipe? Pipe { get; private set; } + public Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken) + => Task.FromResult(new ProcessReply + { + Output = new Output + { + Ok = new Empty(), + }, + }); } diff --git a/Common/tests/Helpers/TestPollsterProvider.cs b/Common/tests/Helpers/TestPollsterProvider.cs index 0f9e31eec..2530f2656 100644 --- a/Common/tests/Helpers/TestPollsterProvider.cs +++ b/Common/tests/Helpers/TestPollsterProvider.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Threading; using ArmoniK.Api.Common.Options; @@ -109,6 +110,16 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, { $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.GraceDelay)}", "00:00:02" }, + { + $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.SharedCacheFolder)}", + Path.Combine(Path.GetTempPath(), + "data") + }, + { + $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.InternalCacheFolder)}", + Path.Combine(Path.GetTempPath(), + "internal") + }, }; Console.WriteLine(minimalConfig.ToJson()); diff --git a/Common/tests/Helpers/TestTaskHandlerProvider.cs b/Common/tests/Helpers/TestTaskHandlerProvider.cs index 077c3ba67..31441542c 100644 --- a/Common/tests/Helpers/TestTaskHandlerProvider.cs +++ b/Common/tests/Helpers/TestTaskHandlerProvider.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Threading; using ArmoniK.Api.Common.Options; @@ -113,6 +114,16 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler, { $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.GraceDelay)}", "00:00:02" }, + { + $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.SharedCacheFolder)}", + Path.Combine(Path.GetTempPath(), + "data") + }, + { + $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.InternalCacheFolder)}", + Path.Combine(Path.GetTempPath(), + "internal") + }, }; Console.WriteLine(minimalConfig.ToJson()); diff --git a/Common/tests/Pollster/AgentTest.cs b/Common/tests/Pollster/AgentTest.cs index 81363525c..446eb016d 100644 --- a/Common/tests/Pollster/AgentTest.cs +++ b/Common/tests/Pollster/AgentTest.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Text; using System.Threading; @@ -35,6 +36,8 @@ using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using Grpc.Core; + using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; @@ -42,7 +45,6 @@ using NUnit.Framework; using Agent = ArmoniK.Core.Common.gRPC.Services.Agent; -using Result = ArmoniK.Core.Common.Storage.Result; using TaskOptions = ArmoniK.Api.gRPC.V1.TaskOptions; using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; @@ -128,6 +130,7 @@ public Task PushMessagesAsync(IEnumerable messages, private class AgentHolder : IDisposable { public readonly Agent Agent; + public readonly string Folder; public readonly IObjectStorage ObjectStorage; private readonly TestDatabaseProvider prov_; public readonly MyPushQueueStorage QueueStorage; @@ -203,6 +206,10 @@ public AgentHolder() CancellationToken.None) .Wait(); + Folder = Path.Combine(Path.GetTempPath(), + "data"); + Directory.CreateDirectory(Folder); + var createdTasks = submitter.CreateTasks(Session, Session, Options.ToTaskOptions(), @@ -291,6 +298,7 @@ public AgentHolder() TaskTable, sessionData, TaskData, + Folder, Token, prov_.GetRequiredService>()); } @@ -322,121 +330,62 @@ public async Task WrongTokens(string token) Assert.AreEqual(CreateTaskReply.ResponseOneofCase.Error, createTaskReply.ResponseCase); - var resultReply = await holder.Agent.SendResult(new TestHelperAsyncStreamReader(new[] - { - new Api.gRPC.V1.Agent.Result - { - CommunicationToken = token, - }, - }), - CancellationToken.None) - .ConfigureAwait(false); - - Assert.AreEqual(ResultReply.TypeOneofCase.Error, - resultReply.TypeCase); - var commonData = new TestHelperServerStreamWriter(); - - await holder.Agent.GetCommonData(new DataRequest - { - CommunicationToken = token, - }, - commonData, - CancellationToken.None) - .ConfigureAwait(false); - - Assert.AreEqual(DataReply.TypeOneofCase.Error, - commonData.Messages.Single() - .TypeCase); - - var directData = new TestHelperServerStreamWriter(); - - await holder.Agent.GetDirectData(new DataRequest - { - CommunicationToken = token, - }, - directData, - CancellationToken.None) - .ConfigureAwait(false); - - Assert.AreEqual(DataReply.TypeOneofCase.Error, - directData.Messages.Single() - .TypeCase); - - var resourceData = new TestHelperServerStreamWriter(); - - await holder.Agent.GetResourceData(new DataRequest - { - CommunicationToken = token, - }, - resourceData, - CancellationToken.None) - .ConfigureAwait(false); - - Assert.AreEqual(DataReply.TypeOneofCase.Error, - resourceData.Messages.Single() - .TypeCase); + Assert.ThrowsAsync(() => holder.Agent.NotifyResultData(new NotifyResultDataRequest + { + CommunicationToken = token, + }, + CancellationToken.None)); + + Assert.ThrowsAsync(() => holder.Agent.GetCommonData(new DataRequest + { + CommunicationToken = token, + }, + CancellationToken.None)); + + Assert.ThrowsAsync(() => holder.Agent.GetDirectData(new DataRequest + { + CommunicationToken = token, + }, + CancellationToken.None)); + + Assert.ThrowsAsync(() => holder.Agent.GetResourceData(new DataRequest + { + CommunicationToken = token, + }, + CancellationToken.None)); } [Test] - public async Task UnImplementedData() + public void UnImplementedData() { using var holder = new AgentHolder(); - var commonData = new TestHelperServerStreamWriter(); - - await holder.Agent.GetCommonData(new DataRequest - { - CommunicationToken = holder.Token, - }, - commonData, - CancellationToken.None) - .ConfigureAwait(false); - - Assert.AreEqual(DataReply.TypeOneofCase.Error, - commonData.Messages.Single() - .TypeCase); - - var directData = new TestHelperServerStreamWriter(); - - await holder.Agent.GetDirectData(new DataRequest - { - CommunicationToken = holder.Token, - }, - directData, - CancellationToken.None) - .ConfigureAwait(false); - - Assert.AreEqual(DataReply.TypeOneofCase.Error, - directData.Messages.Single() - .TypeCase); + Assert.ThrowsAsync(() => holder.Agent.GetCommonData(new DataRequest + { + CommunicationToken = holder.Token, + }, + CancellationToken.None)); + + Assert.ThrowsAsync(() => holder.Agent.GetDirectData(new DataRequest + { + CommunicationToken = holder.Token, + }, + CancellationToken.None)); } [Test] public async Task MissingResourceData() { - using var holder = new AgentHolder(); - var resourceData = new TestHelperServerStreamWriter(); - - await holder.Agent.GetResourceData(new DataRequest - { - CommunicationToken = holder.Token, - Key = "DataNotExisting", - }, - resourceData, - CancellationToken.None) - .ConfigureAwait(false); + using var holder = new AgentHolder(); - Assert.AreEqual(DataReply.TypeOneofCase.Init, - resourceData.Messages.Single() - .TypeCase); - Assert.AreEqual(DataReply.Types.Init.HasResultOneofCase.Error, - resourceData.Messages.Single() - .Init.HasResultCase); - Assert.AreEqual("Key not found", - resourceData.Messages.Single() - .Init.Error); + Assert.ThrowsAsync(() => holder.Agent.GetResourceData(new DataRequest + { + CommunicationToken = holder.Token, + ResultId = "DataNotExisting", + }, + CancellationToken.None)); } [Test] @@ -448,58 +397,29 @@ public async Task SendResultShouldSucceed() holder.QueueStorage.Messages.SelectMany(pair => pair.Value) .Count()); - var resultReply = await holder.Agent.SendResult(new TestHelperAsyncStreamReader(new[] - { - new Api.gRPC.V1.Agent.Result - { - CommunicationToken = holder.Token, - Init = new InitKeyedDataStream - { - Key = ExpectedOutput1, - }, - }, - new Api.gRPC.V1.Agent.Result - { - CommunicationToken = holder.Token, - Data = new DataChunk - { - Data = ByteString.CopyFromUtf8("Data1"), - }, - }, - new Api.gRPC.V1.Agent.Result - { - CommunicationToken = holder.Token, - Data = new DataChunk - { - Data = ByteString.CopyFromUtf8("Data2"), - }, - }, - new Api.gRPC.V1.Agent.Result - { - CommunicationToken = holder.Token, - Data = new DataChunk - { - DataComplete = true, - }, - }, - new Api.gRPC.V1.Agent.Result - { - CommunicationToken = holder.Token, - Init = new InitKeyedDataStream - { - LastResult = true, - }, - }, - }), - CancellationToken.None) - .ConfigureAwait(false); + await File.WriteAllBytesAsync(Path.Combine(holder.Folder, + ExpectedOutput1), + Encoding.ASCII.GetBytes("Data1Data2")) + .ConfigureAwait(false); + + await holder.Agent.NotifyResultData(new NotifyResultDataRequest + { + CommunicationToken = holder.Token, + Ids = + { + new NotifyResultDataRequest.Types.ResultIdentifier + { + ResultId = ExpectedOutput1, + SessionId = holder.Session, + }, + }, + }, + CancellationToken.None) + .ConfigureAwait(false); await holder.Agent.FinalizeTaskCreation(CancellationToken.None) .ConfigureAwait(false); - Assert.AreEqual(ResultReply.TypeOneofCase.Ok, - resultReply.TypeCase); - var resultData = await holder.ResultTable.GetResult(holder.Session, ExpectedOutput1, CancellationToken.None) @@ -528,8 +448,7 @@ await holder.Agent.FinalizeTaskCreation(CancellationToken.None) Assert.Contains(holder.TaskWithDependencies2, holder.QueueStorage.Messages[Partition]); Assert.AreEqual(2, - holder.QueueStorage.Messages[Partition] - .Count); + holder.QueueStorage.Messages[Partition].Count); var taskData1 = await holder.TaskTable.ReadTaskAsync(holder.TaskWithDependencies1, CancellationToken.None) @@ -720,8 +639,7 @@ await holder.Agent.FinalizeTaskCreation(CancellationToken.None) .ConfigureAwait(false); Assert.AreEqual(3, - holder.QueueStorage.Messages[Partition] - .Count); + holder.QueueStorage.Messages[Partition].Count); taskData3 = await holder.TaskTable.ReadTaskAsync(taskId3, CancellationToken.None) @@ -782,8 +700,7 @@ await holder.Agent.FinalizeTaskCreation(CancellationToken.None) [Test] public async Task GetResourceDataShouldSucceed() { - using var holder = new AgentHolder(); - var resourceData = new TestHelperServerStreamWriter(); + using var holder = new AgentHolder(); await holder.ObjectStorage.AddOrUpdateAsync("ResourceData", new List @@ -798,27 +715,16 @@ await holder.ObjectStorage.AddOrUpdateAsync("ResourceData", await holder.Agent.GetResourceData(new DataRequest { CommunicationToken = holder.Token, - Key = "ResourceData", + ResultId = "ResourceData", }, - resourceData, CancellationToken.None) .ConfigureAwait(false); - foreach (var message in resourceData.Messages) - { - Console.WriteLine(message); - } - - Assert.AreEqual(3, - resourceData.Messages.Count); - Assert.AreEqual("Data1", - resourceData.Messages[0] - .Data.Data); - Assert.AreEqual("Data2", - resourceData.Messages[1] - .Data.Data); - Assert.IsTrue(resourceData.Messages[2] - .Data.DataComplete); + var bytes = await File.ReadAllBytesAsync(Path.Combine(holder.Folder, + "ResourceData")) + .ConfigureAwait(false); + Assert.AreEqual(Encoding.ASCII.GetBytes("Data1Data2"), + bytes); } [Test] @@ -1049,29 +955,25 @@ public async Task SubmitTasksUploadPayloadShouldSucceed(bool optionsNull) .ConfigureAwait(false); - await holder.Agent.UploadResultData(new TestHelperAsyncStreamReader(new List - { - new() - { - CommunicationToken = holder.Token, - Id = new UploadResultDataRequest.Types.ResultIdentifier - { - ResultId = eok.Results.Last() - .ResultId, - SessionId = holder.Session, - }, - }, - new() - { - CommunicationToken = holder.Token, - DataChunk = ByteString.CopyFromUtf8("DataPart1"), - }, - new() - { - CommunicationToken = holder.Token, - DataChunk = ByteString.CopyFromUtf8("DataPart2"), - }, - }), + await File.WriteAllBytesAsync(Path.Combine(holder.Folder, + eok.Results.Last() + .ResultId), + Encoding.ASCII.GetBytes("Data1Data2")) + .ConfigureAwait(false); + + await holder.Agent.NotifyResultData(new NotifyResultDataRequest + { + CommunicationToken = holder.Token, + Ids = + { + new NotifyResultDataRequest.Types.ResultIdentifier + { + ResultId = eok.Results.Last() + .ResultId, + SessionId = holder.Session, + }, + }, + }, CancellationToken.None) .ConfigureAwait(false); diff --git a/Common/tests/Pollster/DataPrefetcherTest.cs b/Common/tests/Pollster/DataPrefetcherTest.cs index 54eaacd92..71932b8aa 100644 --- a/Common/tests/Pollster/DataPrefetcherTest.cs +++ b/Common/tests/Pollster/DataPrefetcherTest.cs @@ -18,12 +18,11 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Pollster; using ArmoniK.Core.Common.Storage; @@ -35,8 +34,6 @@ using NUnit.Framework; -using Output = ArmoniK.Core.Common.Storage.Output; -using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; namespace ArmoniK.Core.Common.Tests.Pollster; @@ -83,407 +80,55 @@ public async Task EmptyPayloadAndOneDependency() const string podId = "PodId"; const string podName = "PodName"; const string payloadId = "PayloadId"; - var res = await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, - taskId, - podId, - podName, - payloadId, - new[] - { - parentTaskId, - }, - new[] - { - dependency1, - }, - new[] - { - output1, - }, - Array.Empty(), - TaskStatus.Submitted, - new TaskOptions(new Dictionary(), - TimeSpan.FromSeconds(100), - 5, - 1, - "part1", - "applicationName", - "applicationVersion", - "applicationNamespace", - "applicationService", - "engineType"), - new Output(true, - "")), - CancellationToken.None) - .ConfigureAwait(false); - var computeRequests = res.ToArray(); - foreach (var request in computeRequests) - { - Console.WriteLine(request); - } - - Assert.AreEqual(computeRequests[0] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.InitRequest); - Assert.AreEqual(computeRequests[0] - .InitRequest.SessionId, - sessionId); - Assert.AreEqual(computeRequests[0] - .InitRequest.TaskId, - taskId); - Assert.AreEqual(computeRequests[0] - .InitRequest.ExpectedOutputKeys.First(), - output1); - - Assert.AreEqual(computeRequests[1] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Payload); - Assert.AreEqual(computeRequests[1] - .Payload.TypeCase, - DataChunk.TypeOneofCase.Data); - - Assert.AreEqual(computeRequests[2] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Payload); - Assert.AreEqual(computeRequests[2] - .Payload.TypeCase, - DataChunk.TypeOneofCase.Data); - - Assert.AreEqual(computeRequests[3] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Payload); - Assert.AreEqual(computeRequests[3] - .Payload.TypeCase, - DataChunk.TypeOneofCase.Data); - - Assert.AreEqual(computeRequests[4] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Payload); - Assert.AreEqual(computeRequests[4] - .Payload.TypeCase, - DataChunk.TypeOneofCase.DataComplete); - Assert.IsTrue(computeRequests[4] - .Payload.DataComplete); - - Assert.AreEqual(computeRequests[5] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.InitData); - Assert.AreEqual(computeRequests[5] - .InitData.Key, - dependency1); - - Assert.AreEqual(computeRequests[6] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Data); - Assert.AreEqual(computeRequests[6] - .Data.TypeCase, - DataChunk.TypeOneofCase.Data); - - Assert.AreEqual(computeRequests[7] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Data); - Assert.AreEqual(computeRequests[7] - .Data.TypeCase, - DataChunk.TypeOneofCase.Data); - - Assert.AreEqual(computeRequests[8] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Data); - Assert.AreEqual(computeRequests[8] - .Data.TypeCase, - DataChunk.TypeOneofCase.Data); - - Assert.AreEqual(computeRequests[9] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Data); - Assert.AreEqual(computeRequests[9] - .Data.TypeCase, - DataChunk.TypeOneofCase.Data); - - Assert.AreEqual(computeRequests[10] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.Data); - Assert.AreEqual(computeRequests[10] - .Data.TypeCase, - DataChunk.TypeOneofCase.DataComplete); - Assert.IsTrue(computeRequests[10] - .Data.DataComplete); - - Assert.AreEqual(computeRequests[11] - .TypeCase, - ProcessRequest.Types.ComputeRequest.TypeOneofCase.InitData); - Assert.AreEqual(computeRequests[11] - .InitData.TypeCase, - ProcessRequest.Types.ComputeRequest.Types.InitData.TypeOneofCase.LastData); - Assert.IsTrue(computeRequests[11] - .InitData.LastData); - } - - [Test] - public async Task EmptyPayloadAndOneDependencyStateMachine() - { - var mockObjectStorage = new Mock(); - mockObjectStorage.Setup(x => x.GetValuesAsync(It.IsAny(), - CancellationToken.None)) - .Returns((string _, - CancellationToken _) => new List - { - Convert.FromBase64String("1111"), - Convert.FromBase64String("2222"), - Convert.FromBase64String("3333"), - Convert.FromBase64String("4444"), - }.ToAsyncEnumerable()); - - var loggerFactory = new LoggerFactory(); - - var dataPrefetcher = new DataPrefetcher(mockObjectStorage.Object, - activitySource_, - loggerFactory.CreateLogger()); - - const string sessionId = "SessionId"; - const string parentTaskId = "ParentTaskId"; - const string taskId = "TaskId"; - const string output1 = "Output1"; - const string dependency1 = "Dependency1"; - const string podName = "PodName"; - const string podId = "PodId"; - const string payloadId = "PayloadId"; - var res = await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, - taskId, - podId, - podName, - payloadId, - new[] - { - parentTaskId, - }, - new[] - { - dependency1, - }, - new[] - { - output1, - }, - Array.Empty(), - TaskStatus.Submitted, - new TaskOptions(new Dictionary(), - TimeSpan.FromSeconds(100), - 5, - 1, - "part1", - "", - "", - "", - "", - ""), - new Output(true, - "")), - CancellationToken.None) - .ConfigureAwait(false); - Assert.AreNotEqual(0, - res.Count); - } - - [Test] - public async Task EmptyPayloadAndOneDependencyWithDataStateMachine() - { - var mockObjectStorage = new Mock(); - mockObjectStorage.Setup(x => x.GetValuesAsync(It.IsAny(), - CancellationToken.None)) - .Returns((string _, - CancellationToken _) => new List - { - Convert.FromBase64String("1111"), - Convert.FromBase64String("2222"), - Convert.FromBase64String("3333"), - Convert.FromBase64String("4444"), - }.ToAsyncEnumerable()); - - var loggerFactory = new LoggerFactory(); - - var dataPrefetcher = new DataPrefetcher(mockObjectStorage.Object, - activitySource_, - loggerFactory.CreateLogger()); - - const string sessionId = "SessionId"; - const string parentTaskId = "ParentTaskId"; - const string taskId = "TaskId"; - const string output1 = "Output1"; - const string dependency1 = "Dependency1"; - const string podId = "PodId"; - const string podName = "PodName"; - const string payloadId = "PayloadId"; - var res = await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, - taskId, - podId, - podName, - payloadId, - new[] - { - parentTaskId, - }, - new[] - { - dependency1, - }, - new[] - { - output1, - }, - Array.Empty(), - TaskStatus.Submitted, - new TaskOptions(new Dictionary(), - TimeSpan.FromSeconds(100), - 5, - 1, - "part1", - "applicationName", - "applicationVersion", - "applicationNamespace", - "applicationService", - "engineType"), - new Output(true, - "")), - CancellationToken.None) - .ConfigureAwait(false); - Assert.AreNotEqual(0, - res.Count); - } - - [Test] - public async Task PayloadWithDataAndOneDependencyWithDataStateMachine() - { - var mockObjectStorage = new Mock(); - mockObjectStorage.Setup(x => x.GetValuesAsync(It.IsAny(), - CancellationToken.None)) - .Returns((string _, - CancellationToken _) => new List - { - Convert.FromBase64String("1111"), - Convert.FromBase64String("2222"), - Convert.FromBase64String("3333"), - Convert.FromBase64String("4444"), - }.ToAsyncEnumerable()); - - var loggerFactory = new LoggerFactory(); - - var dataPrefetcher = new DataPrefetcher(mockObjectStorage.Object, - activitySource_, - loggerFactory.CreateLogger()); + var sharedFolder = Path.Combine(Path.GetTempPath(), + "data"); + var internalFolder = Path.Combine(Path.GetTempPath(), + "internal"); + Directory.Delete(sharedFolder, + true); + Directory.CreateDirectory(sharedFolder); + + await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, + taskId, + podId, + podName, + payloadId, + new[] + { + parentTaskId, + }, + new[] + { + dependency1, + }, + new[] + { + output1, + }, + Array.Empty(), + TaskStatus.Submitted, + new TaskOptions(new Dictionary(), + TimeSpan.FromSeconds(100), + 5, + 1, + "part1", + "applicationName", + "applicationVersion", + "applicationNamespace", + "applicationService", + "engineType"), + new Output(true, + "")), + sharedFolder, + CancellationToken.None) + .ConfigureAwait(false); - const string sessionId = "SessionId"; - const string parentTaskId = "ParentTaskId"; - const string taskId = "TaskId"; - const string output1 = "Output1"; - const string dependency1 = "Dependency1"; - const string podId = "PodId"; - const string podName = "PodName"; - const string payloadId = "PayloadId"; - var res = await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, - taskId, - podId, - podName, - payloadId, - new[] - { - parentTaskId, - }, - new[] - { - dependency1, - }, - new[] - { - output1, - }, - Array.Empty(), - TaskStatus.Submitted, - new TaskOptions(new Dictionary(), - TimeSpan.FromSeconds(100), - 5, - 1, - "part1", - "applicationName", - "applicationVersion", - "applicationNamespace", - "applicationService", - "engineType"), - new Output(true, - "")), - CancellationToken.None) - .ConfigureAwait(false); - Assert.AreNotEqual(0, - res.Count); + Assert.IsTrue(File.Exists(Path.Combine(sharedFolder, + payloadId))); + Assert.IsTrue(File.Exists(Path.Combine(sharedFolder, + dependency1))); } - [Test] - public async Task EmptyPayloadAndTwoDependenciesStateMachine() - { - var mockObjectStorage = new Mock(); - mockObjectStorage.Setup(x => x.GetValuesAsync(It.IsAny(), - CancellationToken.None)) - .Returns((string _, - CancellationToken _) => new List - { - Convert.FromBase64String("1111"), - Convert.FromBase64String("2222"), - Convert.FromBase64String("3333"), - Convert.FromBase64String("4444"), - }.ToAsyncEnumerable()); - - var loggerFactory = new LoggerFactory(); - - var dataPrefetcher = new DataPrefetcher(mockObjectStorage.Object, - activitySource_, - loggerFactory.CreateLogger()); - - const string sessionId = "SessionId"; - const string parentTaskId = "ParentTaskId"; - const string taskId = "TaskId"; - const string output1 = "Output1"; - const string dependency1 = "Dependency1"; - const string dependency2 = "Dependency2"; - const string podId = "PodId"; - const string podName = "PodName"; - const string payloadId = "PayloadId"; - var res = await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, - taskId, - podId, - podName, - payloadId, - new[] - { - parentTaskId, - }, - new[] - { - dependency1, - dependency2, - }, - new[] - { - output1, - }, - Array.Empty(), - TaskStatus.Submitted, - new TaskOptions(new Dictionary(), - TimeSpan.FromSeconds(100), - 5, - 1, - "part1", - "applicationName", - "applicationVersion", - "applicationNamespace", - "applicationService", - "engineType"), - new Output(true, - "")), - CancellationToken.None) - .ConfigureAwait(false); - Assert.AreNotEqual(0, - res.Count); - } [Test] public async Task EmptyPayloadAndNoDependenciesStateMachine() @@ -515,43 +160,52 @@ public async Task EmptyPayloadAndNoDependenciesStateMachine() const string podId = "PodId"; const string podName = "PodName"; const string payloadId = "PayloadId"; - var res = await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, - taskId, - podId, - podName, - payloadId, - new[] - { - parentTaskId, - }, - new[] - { - dependency1, - dependency2, - }, - new[] - { - output1, - }, - Array.Empty(), - TaskStatus.Submitted, - new TaskOptions(new Dictionary(), - TimeSpan.FromSeconds(100), - 5, - 1, - "part1", - "applicationName", - "applicationVersion", - "applicationNamespace", - "applicationService", - "engineType"), - new Output(true, - "")), - CancellationToken.None) - .ConfigureAwait(false); + var sharedFolder = Path.Combine(Path.GetTempPath(), + "data"); + var internalFolder = Path.Combine(Path.GetTempPath(), + "internal"); + Directory.Delete(sharedFolder, + true); + Directory.CreateDirectory(sharedFolder); + + await dataPrefetcher.PrefetchDataAsync(new TaskData(sessionId, + taskId, + podId, + podName, + payloadId, + new[] + { + parentTaskId, + }, + new[] + { + dependency1, + dependency2, + }, + new[] + { + output1, + }, + Array.Empty(), + TaskStatus.Submitted, + new TaskOptions(new Dictionary(), + TimeSpan.FromSeconds(100), + 5, + 1, + "part1", + "applicationName", + "applicationVersion", + "applicationNamespace", + "applicationService", + "engineType"), + new Output(true, + "")), + sharedFolder, + CancellationToken.None) + .ConfigureAwait(false); - Assert.AreNotEqual(0, - res.Count); + Assert.IsTrue(File.Exists(Path.Combine(sharedFolder, + payloadId))); } [Test] diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index bc4b62a63..ed4fa50a2 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -33,7 +33,6 @@ using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Stream.Worker; using ArmoniK.Core.Common.Tests.Helpers; -using ArmoniK.Core.Common.Utils; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; @@ -42,7 +41,6 @@ using NUnit.Framework; -using Output = ArmoniK.Api.gRPC.V1.Output; using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; @@ -214,10 +212,9 @@ public void Dispose() { } - public IAsyncPipe? Pipe { get; } - - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) + public Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken) => throw new NotImplementedException(); } @@ -230,6 +227,7 @@ private class ReturnHealthCheckWorkerStreamHandler : IWorkerStreamHandler public ReturnHealthCheckWorkerStreamHandler(HealthCheckResult healthCheckResult) => healthCheckResult_ = healthCheckResult; + public Task Init(CancellationToken cancellationToken) { isInitialized_ = true; @@ -245,10 +243,9 @@ public void Dispose() { } - public IAsyncPipe? Pipe { get; } - - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) + public Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken) => throw new NotImplementedException(); } @@ -422,45 +419,19 @@ public Task Init(CancellationToken cancellationToken) public void Dispose() => GC.SuppressFinalize(this); - public IAsyncPipe? Pipe { get; private set; } - - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) - => Pipe = new WaitAsyncPipe(delay_); - } - - public class WaitAsyncPipe : IAsyncPipe - { - private readonly double delay_; - - public WaitAsyncPipe(double delay) - => delay_ = delay; - - public async Task ReadAsync(CancellationToken cancellationToken) + public Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken) { - await Task.Delay(TimeSpan.FromMilliseconds(delay_), - CancellationToken.None) - .ConfigureAwait(false); - return new ProcessReply - { - CommunicationToken = "", - Output = new Output - { - Ok = new Empty(), - }, - }; + Task.Delay(TimeSpan.FromMilliseconds(delay_), + cancellationToken); + return Task.FromResult(new ProcessReply{ + Output = new Api.gRPC.V1.Output { Ok = new Empty()}, + }); } - - public Task WriteAsync(ProcessRequest message) - => Task.CompletedTask; - - public Task WriteAsync(IEnumerable message) - => Task.CompletedTask; - - public Task CompleteAsync() - => Task.CompletedTask; } + [Test] [TestCase(100)] [TestCase(5000)] // task should be longer than the grace delay @@ -499,9 +470,7 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) Assert.False(testServiceProvider.Pollster.Failed); Assert.True(source.Token.IsCancellationRequested); - Assert.AreEqual(delay < 1000 - ? TaskStatus.Completed - : TaskStatus.Processing, + Assert.AreEqual(TaskStatus.Completed, (await testServiceProvider.TaskTable.GetTaskStatus(new[] { taskSubmitted, @@ -592,7 +561,8 @@ public static IEnumerable ExecuteTooManyErrorShouldFailTestCase { // Failing WorkerStreamHandler var mockStreamHandlerFail = new Mock(); - mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), + mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), + It.IsAny(), It.IsAny())) .Throws(new ApplicationException("Failed WorkerStreamHandler")); yield return new TestCaseData(mockStreamHandlerFail, @@ -619,6 +589,7 @@ public static IEnumerable ExecuteTooManyErrorShouldFailTestCase It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny())) .Throws(new ApplicationException("Failed agent")); @@ -662,7 +633,8 @@ public async Task UnavailableWorkerShouldFail() var simpleAgentHandler = new SimpleAgentHandler(); var mockStreamHandlerFail = new Mock(); - mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), + mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), + It.IsAny(), It.IsAny())) .Throws(new TestUnavailableRpcException("Unavailable worker")); diff --git a/Common/tests/Pollster/TaskHandlerTest.cs b/Common/tests/Pollster/TaskHandlerTest.cs index f04f59af8..f84d68245 100644 --- a/Common/tests/Pollster/TaskHandlerTest.cs +++ b/Common/tests/Pollster/TaskHandlerTest.cs @@ -33,7 +33,6 @@ using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Stream.Worker; using ArmoniK.Core.Common.Tests.Helpers; -using ArmoniK.Core.Common.Utils; using Grpc.Core; @@ -46,7 +45,6 @@ using NUnit.Framework; using Output = ArmoniK.Core.Common.Storage.Output; -using Result = ArmoniK.Core.Common.Storage.Result; using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; @@ -1017,10 +1015,9 @@ public Task Init(CancellationToken cancellationToken) public void Dispose() => GC.SuppressFinalize(this); - public IAsyncPipe? Pipe { get; } - - public void StartTaskProcessing(TaskData taskData, - CancellationToken cancellationToken) + public Task StartTaskProcessing(ProcessRequest request, + TimeSpan duration, + CancellationToken cancellationToken) => throw new T(); } @@ -1072,10 +1069,6 @@ public static IEnumerable TestCaseOuptut yield return new TestCaseData(new ExceptionWorkerStreamHandler(3000)).Returns((TaskStatus.Submitted, QueueMessageStatus.Postponed)) .SetArgDisplayNames("RpcExceptionTaskCancellation"); - // Worker unavailable during execution should put the task in error - yield return new TestCaseData(new ExceptionWorkerStreamHandler(0)).Returns((TaskStatus.Retried, QueueMessageStatus.Cancelled)) - .SetArgDisplayNames("UnavailableBeforeCancellation"); - // If the worker becomes unavailable during the task execution after cancellation, the task should be resubmitted yield return new TestCaseData(new ExceptionWorkerStreamHandler(3000)).Returns((TaskStatus.Submitted, QueueMessageStatus.Postponed)) .SetArgDisplayNames("UnavailableAfterCancellation"); @@ -1280,9 +1273,9 @@ public static IEnumerable TestCaseOutputExecuteTaskWithErrorDuringStartInWorkerH { get { - yield return new TestCaseData(new ExceptionStartWorkerStreamHandler()).Returns((TaskStatus.Error, QueueMessageStatus.Processed)) + yield return new TestCaseData(new ExceptionStartWorkerStreamHandler()).Returns((TaskStatus.Retried, QueueMessageStatus.Cancelled)) .SetArgDisplayNames("Exception"); // error with resubmit - yield return new TestCaseData(new ExceptionStartWorkerStreamHandler()).Returns((TaskStatus.Error, QueueMessageStatus.Processed)) + yield return new TestCaseData(new ExceptionStartWorkerStreamHandler()).Returns((TaskStatus.Retried, QueueMessageStatus.Cancelled)) .SetArgDisplayNames("RpcException"); // error with resubmit // error meaning that the worker is unavailable, therefore a requeue is made yield return new TestCaseData(new ExceptionStartWorkerStreamHandler()).Returns((TaskStatus.Submitted, QueueMessageStatus.Postponed)) @@ -1335,22 +1328,11 @@ await testServiceProvider.TaskHandler.PreProcessing() } - public static IEnumerable TestCaseOutputExecuteTaskWithErrorDuringExecutionInWorkerHandlerShouldThrow - { - get - { - yield return new TestCaseData(new ExceptionWorkerStreamHandler(100)).SetArgDisplayNames("RpcExceptionResubmit"); // error with resubmit - yield return - new TestCaseData(new ExceptionWorkerStreamHandler(100)) - .SetArgDisplayNames("RpcUnavailableExceptionResubmit"); // error with resubmit - } - } - [Test] - [TestCaseSource(nameof(TestCaseOutputExecuteTaskWithErrorDuringExecutionInWorkerHandlerShouldThrow))] - public async Task ExecuteTaskWithErrorDuringExecutionInWorkerHandlerShouldThrow(ExceptionWorkerStreamHandler sh) - where TEx : Exception, new() + public async Task ExecuteTaskWithErrorDuringExecutionInWorkerHandlerShouldThrow() { + var sh = new ExceptionWorkerStreamHandler(100); + var sqmh = new SimpleQueueMessageHandler { CancellationToken = CancellationToken.None, @@ -1378,8 +1360,8 @@ public async Task ExecuteTaskWithErrorDuringExecutionInWorkerHandlerShouldThrow< await testServiceProvider.TaskHandler.PreProcessing() .ConfigureAwait(false); - Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.ExecuteTask() - .ConfigureAwait(false)); + Assert.ThrowsAsync(async () => await testServiceProvider.TaskHandler.ExecuteTask() + .ConfigureAwait(false)); var taskData = await testServiceProvider.TaskTable.ReadTaskAsync(taskId, CancellationToken.None) @@ -1447,15 +1429,6 @@ await agentHandler.Agent.CreateTask(taskStreamReader, CancellationToken.None) .ConfigureAwait(false); - - var resultStreamReader = new TestHelperAsyncStreamReader(new[] - { - new Api.gRPC.V1.Agent.Result(), - }); - await agentHandler.Agent.SendResult(resultStreamReader, - CancellationToken.None) - .ConfigureAwait(false); - await testServiceProvider.TaskHandler.PostProcessing() .ConfigureAwait(false); diff --git a/Common/tests/StateMachines/ComputeRequestStateMachineTest.cs b/Common/tests/StateMachines/ComputeRequestStateMachineTest.cs deleted file mode 100644 index be44996b7..000000000 --- a/Common/tests/StateMachines/ComputeRequestStateMachineTest.cs +++ /dev/null @@ -1,202 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; - -using ArmoniK.Core.Common.StateMachines; - -using Microsoft.Extensions.Logging.Abstractions; - -using NUnit.Framework; - -namespace ArmoniK.Core.Common.Tests.StateMachines; - -[TestFixture] -public class ComputeRequestStateMachineTest -{ - [SetUp] - public void Setup() - => sm_ = new ComputeRequestStateMachine(NullLogger.Instance); - - private ComputeRequestStateMachine? sm_; - - [Test] - public void PayloadFirstShouldFail() - => Assert.Throws(() => sm_!.AddPayloadChunk()); - - [Test] - public void DataChunkFirstShouldFail() - => Assert.Throws(() => sm_!.AddDataDependencyChunk()); - - [Test] - public void InitDataFirstShouldFail() - => Assert.Throws(() => sm_!.InitDataDependency()); - - [Test] - public void TwoInitRequestsShouldFail() - { - sm_!.InitRequest(); - Assert.Throws(() => sm_.InitRequest()); - } - - [Test] - public void GetQueueWithoutPayloadCompleteShouldFail() - { - sm_!.InitRequest(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - - Assert.Throws(() => sm_.CompleteRequest()); - } - - [Test] - public void GetQueueWithPayloadCompleteShouldSucceed() - { - sm_!.InitRequest(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.CompletePayload(); - - sm_.CompleteRequest(); - - Assert.AreEqual(ComputeRequestStateMachine.State.DataLast, - sm_.GetState()); - } - - [Test] - public void DataDepWithoutChunkShouldFail() - { - sm_!.InitRequest(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.CompletePayload(); - - sm_.InitDataDependency(); - Assert.Throws(() => sm_.CompleteDataDependency()); - } - - [Test] - public void HappyPathShouldSucceed() - { - sm_!.InitRequest(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.CompletePayload(); - - sm_.InitDataDependency(); - sm_.AddDataDependencyChunk(); - sm_.CompleteDataDependency(); - - sm_.InitDataDependency(); - sm_.AddDataDependencyChunk(); - sm_.CompleteDataDependency(); - - sm_.CompleteRequest(); - Assert.AreEqual(ComputeRequestStateMachine.State.DataLast, - sm_.GetState()); - } - - [Test] - public void HappyPathSmallShouldSucceed() - { - sm_!.InitRequest(); - sm_.CompletePayload(); - - sm_.InitDataDependency(); - sm_.AddDataDependencyChunk(); - sm_.CompleteDataDependency(); - - sm_.CompleteRequest(); - Assert.AreEqual(ComputeRequestStateMachine.State.DataLast, - sm_.GetState()); - } - - [Test] - public void HappyPathNoDataDepShouldSucceed() - { - sm_!.InitRequest(); - sm_.CompletePayload(); - sm_.CompleteRequest(); - Assert.AreEqual(ComputeRequestStateMachine.State.DataLast, - sm_.GetState()); - } - - [Test] - public void HappyPathNoDataDepNoGetQueueShouldFail() - { - sm_!.InitRequest(); - sm_.CompletePayload(); - Assert.AreNotEqual(ComputeRequestStateMachine.State.DataLast, - sm_.GetState()); - } - - [Test] - public void HappyPathMultipleLargeDataShouldSucceed() - { - sm_!.InitRequest(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.AddPayloadChunk(); - sm_.CompletePayload(); - - sm_.InitDataDependency(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.CompleteDataDependency(); - - sm_.InitDataDependency(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.AddDataDependencyChunk(); - sm_.CompleteDataDependency(); - - sm_.CompleteRequest(); - Assert.AreEqual(ComputeRequestStateMachine.State.DataLast, - sm_.GetState()); - } - - [Test] - public void GenerateGraphShouldSucceed() - { - var str = sm_!.GenerateGraph(); - Console.WriteLine(str); - Assert.IsFalse(string.IsNullOrEmpty(str)); - } - - [Test] - public void GenerateMermaidGraphShouldSucceed() - { - var str = sm_!.GenerateMermaidGraph(); - Console.WriteLine(str); - Assert.IsFalse(string.IsNullOrEmpty(str)); - } -} diff --git a/Tests/Bench/Server/src/ArmoniK.Samples.Bench.Server.csproj b/Tests/Bench/Server/src/ArmoniK.Samples.Bench.Server.csproj index b18d0a25e..b3d0e271b 100644 --- a/Tests/Bench/Server/src/ArmoniK.Samples.Bench.Server.csproj +++ b/Tests/Bench/Server/src/ArmoniK.Samples.Bench.Server.csproj @@ -25,7 +25,7 @@ - + diff --git a/Tests/Common/Client/src/ArmoniK.Core.Common.Tests.Client.csproj b/Tests/Common/Client/src/ArmoniK.Core.Common.Tests.Client.csproj index bf730cf53..ca1503922 100644 --- a/Tests/Common/Client/src/ArmoniK.Core.Common.Tests.Client.csproj +++ b/Tests/Common/Client/src/ArmoniK.Core.Common.Tests.Client.csproj @@ -25,7 +25,7 @@ - + diff --git a/Tests/HtcMock/Server/src/ArmoniK.Samples.HtcMock.Server.csproj b/Tests/HtcMock/Server/src/ArmoniK.Samples.HtcMock.Server.csproj index 6df8f14f6..d64ad75a4 100644 --- a/Tests/HtcMock/Server/src/ArmoniK.Samples.HtcMock.Server.csproj +++ b/Tests/HtcMock/Server/src/ArmoniK.Samples.HtcMock.Server.csproj @@ -25,7 +25,7 @@ - + diff --git a/Tests/Stream/Client/ArmoniK.Extensions.Common.StreamWrapper.Tests.Client.csproj b/Tests/Stream/Client/ArmoniK.Extensions.Common.StreamWrapper.Tests.Client.csproj index 7c640ba16..eb4968378 100644 --- a/Tests/Stream/Client/ArmoniK.Extensions.Common.StreamWrapper.Tests.Client.csproj +++ b/Tests/Stream/Client/ArmoniK.Extensions.Common.StreamWrapper.Tests.Client.csproj @@ -25,7 +25,7 @@ - + diff --git a/Tests/Stream/Server/ArmoniK.Extensions.Common.StreamWrapper.Tests.Server.csproj b/Tests/Stream/Server/ArmoniK.Extensions.Common.StreamWrapper.Tests.Server.csproj index 9cba8131a..1ab32ffea 100644 --- a/Tests/Stream/Server/ArmoniK.Extensions.Common.StreamWrapper.Tests.Server.csproj +++ b/Tests/Stream/Server/ArmoniK.Extensions.Common.StreamWrapper.Tests.Server.csproj @@ -24,7 +24,7 @@ - +