From 9eb260f7f96cf39d5a3295d2d3be0fa110ee29ba Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Fri, 7 Jun 2024 20:31:12 +0200 Subject: [PATCH 1/4] Parallelize submission --- .../Common/Submitter/BaseClientSubmitter.cs | 428 +++++++++--------- 1 file changed, 220 insertions(+), 208 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 26749b46b..55fd77636 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -34,6 +34,7 @@ using ArmoniK.DevelopmentKit.Client.Common.Status; using ArmoniK.DevelopmentKit.Common; using ArmoniK.DevelopmentKit.Common.Exceptions; +using ArmoniK.DevelopmentKit.Common.Utils; using ArmoniK.Utils; using Google.Protobuf; @@ -301,23 +302,16 @@ public Output GetTaskOutputInfo(string taskId) /// The result ids must first be created using /// return a list of taskIds of the created tasks [PublicAPI] - public async IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, - int maxRetries = 5, - TaskOptions? taskOptions = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - foreach (var chunk in payloadsWithDependencies.ToChunks(chunkSubmitSize_)) - { - await foreach (var taskIds in ChunkSubmitTasksWithDependenciesAsync(chunk, - maxRetries, - taskOptions ?? TaskOptions, - cancellationToken) - .ConfigureAwait(false)) - { - yield return taskIds; - } - } - } + public IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + => payloadsWithDependencies.ToChunks(chunkSubmitSize_) + .ToAsyncEnumerable() + .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)tuple.Item1, tuple.Item2, tuple.Item3)), + maxRetries, + taskOptions, + cancellationToken)); /// /// The method to submit several tasks with dependencies tasks. This task will wait for @@ -356,33 +350,16 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable /// return a list of taskIds of the created tasks [PublicAPI] - public async IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, - int maxRetries = 5, - TaskOptions? taskOptions = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - foreach (var chunk in payloadsWithDependencies.ToChunks(chunkSubmitSize_)) - { - var resultsMetadata = await CreateResultsMetadataAsync(Enumerable.Range(0, - chunk.Length) - .Select(_ => Guid.NewGuid() - .ToString()), - cancellationToken) - .ConfigureAwait(false); - var tasks = ChunkSubmitTasksWithDependenciesAsync(chunk.Zip(resultsMetadata, - (payloadWithDependencies, - metadata) => Tuple.Create(metadata.Value, - payloadWithDependencies.Item1, - payloadWithDependencies.Item2)), - maxRetries, - taskOptions ?? TaskOptions, - cancellationToken); - await foreach (var taskId in tasks.ConfigureAwait(false)) - { - yield return taskId; - } - } - } + public IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + => payloadsWithDependencies.ToChunks(chunkSubmitSize_) + .ToAsyncEnumerable() + .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)null, tuple.Item1, tuple.Item2)), + maxRetries, + taskOptions, + cancellationToken)); /// /// The method to submit several tasks with dependencies tasks. This task will wait for @@ -420,191 +397,226 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable /// /// return the ids of the created tasks - private async IAsyncEnumerable ChunkSubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, - int maxRetries, - TaskOptions? taskOptions = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + private async IAsyncEnumerable ChunkSubmitTasksWithDependenciesAsync(IEnumerable<(string?, byte[], IList)> payloadsWithDependencies, + int maxRetries, + TaskOptions? taskOptions = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { using var _ = Logger.LogFunction(); - var tasks = new List(); + var taskProperties = new List<(Either, int, bool, IList)>(); + var smallPayloadProperties = new List(); + var largePayloadProperties = new List<(byte[], int)>(); + var nbResults = 0; foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { - for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) + Either result; + if (resultId is null) { - await using var channel = await ChannelPool.GetAsync(cancellationToken) - .ConfigureAwait(false); - var resultsClient = new Results.ResultsClient(channel); - - try - { - // todo: migrate to ArmoniK.Api - string payloadId; - if (payload.Length > configuration_) - { - var response = await resultsClient.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest - { - SessionId = SessionId.Id, - Results = - { - new CreateResultsMetaDataRequest.Types.ResultCreate(), - }, - }, - cancellationToken: cancellationToken) - .ConfigureAwait(false); - payloadId = response.Results.Select(raw => raw.ResultId) - .Single(); - - await resultsClient.UploadResultData(SessionId.Id, - payloadId, - payload); - } - else - { - var response = await resultsClient.CreateResultsAsync(new CreateResultsRequest - { - SessionId = SessionId.Id, - Results = - { - new CreateResultsRequest.Types.ResultCreate - { - Data = UnsafeByteOperations.UnsafeWrap(payload), - }, - }, - }, - cancellationToken: cancellationToken) - .ConfigureAwait(false); - payloadId = response.Results.Select(raw => raw.ResultId) - .Single(); - } - - - tasks.Add(new SubmitTasksRequest.Types.TaskCreation - { - PayloadId = payloadId, - DataDependencies = - { - dependencies, - }, - ExpectedOutputKeys = - { - resultId, - }, - }); - // break retry loop because submission is successful - break; - } - catch (Exception e) - { - if (nbRetry >= maxRetries - 1) - { - throw; - } - - var innerException = e is AggregateException - { - InnerExceptions.Count: 1, - } agg - ? agg.InnerException - : e; - - switch (innerException) - { - case RpcException: - case IOException: - Logger.LogWarning(innerException, - "Failure to submit : Retrying"); - break; - default: - Logger.LogError(innerException, - "Unknown failure"); - throw; - } - - if (nbRetry > 0) - { - Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit task associated to {resultId}", - nbRetry, - maxRetries, - resultId); - } - } + result = nbResults; + nbResults += 1; } - } - - foreach (var taskChunk in tasks.ToChunks(100)) - { - if (taskChunk.Length == 0) + else { - continue; + result = resultId; } - SubmitTasksResponse? submitTasksResponse = null; - for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) + int payloadIndex; + bool isLarge; + if (payload.Length > configuration_) { - await using var channel = await ChannelPool.GetAsync(cancellationToken) - .ConfigureAwait(false); - var tasksClient = new Tasks.TasksClient(channel); + payloadIndex = largePayloadProperties.Count; + largePayloadProperties.Add((payload, nbResults)); + nbResults += 1; + isLarge = true; + } + else + { + payloadIndex = smallPayloadProperties.Count; + smallPayloadProperties.Add(payload); + isLarge = false; + } - try - { - submitTasksResponse = await tasksClient.SubmitTasksAsync(new SubmitTasksRequest - { - TaskOptions = taskOptions, - SessionId = SessionId.Id, - TaskCreations = - { - taskChunk, - }, - }, - cancellationToken: cancellationToken) - .ConfigureAwait(false); + taskProperties.Add((result, payloadIndex, isLarge, dependencies)); + } - // break retry loop because submission is successful - break; - } - catch (Exception e) - { - if (nbRetry >= maxRetries - 1) - { - throw; - } + var uploadSmallPayloads = smallPayloadProperties.ParallelSelect(new ParallelTaskOptions(1, + cancellationToken), + payload => Retry.WhileException(5, + 2000, + async _ => + { + await using var channel = + await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var resultClient = new Results.ResultsClient(channel); + var response = await resultClient + .CreateResultsAsync(new CreateResultsRequest + { + SessionId = + SessionId.Id, + Results = + { + new + CreateResultsRequest. + Types.ResultCreate + { + Data = + UnsafeByteOperations + .UnsafeWrap(payload), + }, + }, + }, + cancellationToken: + cancellationToken) + .ConfigureAwait(false); + + return response.Results.Single() + .ResultId; + }, + true, + Logger, + cancellationToken, + typeof(IOException), + typeof(RpcException)) + .AsTask()) + .ToListAsync(cancellationToken); + + var createResultMetadata = Retry.WhileException(5, + 2000, + async _ => + { + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var resultClient = new Results.ResultsClient(channel); + var response = await resultClient.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest + { + SessionId = SessionId.Id, + Results = + { + Enumerable.Range(0, + nbResults) + .Select(_ => new + CreateResultsMetaDataRequest. + Types.ResultCreate()), + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return response.Results.Select(result => result.ResultId) + .AsIList(); + }, + true, + Logger, + cancellationToken, + typeof(IOException), + typeof(RpcException)) + .AsTask(); + + + var uploadLargePayloads = largePayloadProperties.ParallelForEach(new ParallelTaskOptions(1, + cancellationToken), + async payload => + { + var results = await createResultMetadata.ConfigureAwait(false); + + await Retry.WhileException(5, + 2000, + async _ => + { + var resultId = results[payload.Item2]; + await using var channel = + await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var resultClient = new Results.ResultsClient(channel); + + await resultClient.UploadResultData(SessionId.Id, + resultId, + payload.Item1) + .ConfigureAwait(false); + }, + true, + Logger, + cancellationToken, + typeof(IOException), + typeof(RpcException)) + .ConfigureAwait(false); + }); + + var results = await createResultMetadata.ConfigureAwait(false); + var smallPayloads = await uploadSmallPayloads.ConfigureAwait(false); + + var tasks = taskProperties.Select(tuple => + { + var (result, payloadIndex, isLarge, dependencies) = tuple; + var resultId = (string?)result ?? results[(int)result]!; - var innerException = e is AggregateException - { - InnerExceptions.Count: 1, - } agg - ? agg.InnerException - : e; + var payloadId = isLarge + ? results[largePayloadProperties[payloadIndex] + .Item2] + : smallPayloads[payloadIndex]; - switch (innerException) - { - case RpcException: - case IOException: - Logger.LogWarning(innerException, - "Failure to submit : Retrying"); - break; - default: - Logger.LogError(innerException, - "Unknown failure"); - throw; - } + return new SubmitTasksRequest.Types.TaskCreation + { + PayloadId = payloadId, + DataDependencies = + { + dependencies, + }, + ExpectedOutputKeys = + { + resultId, + }, + }; + }) + .AsIList(); + + var taskSubmit = tasks.ToChunks(100) + .ParallelSelect(new ParallelTaskOptions(1, + cancellationToken), + async taskChunk => + { + var response = await Retry.WhileException(5, + 2000, + async _ => + { + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var taskClient = new Tasks.TasksClient(channel); + + return await taskClient.SubmitTasksAsync(new SubmitTasksRequest + { + TaskOptions = taskOptions, + SessionId = SessionId.Id, + TaskCreations = + { + taskChunk, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + }, + true, + Logger, + cancellationToken, + typeof(IOException), + typeof(RpcException)) + .ConfigureAwait(false); - if (nbRetry > 0) - { - Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit tasks", - nbRetry, - maxRetries); - } - } - } + return response.TaskInfos.Select(task => task.TaskId); + }); - foreach (var taskInfo in submitTasksResponse!.TaskInfos) + await foreach (var taskChunk in taskSubmit.ConfigureAwait(false)) + { + foreach (var task in taskChunk) { - yield return taskInfo.TaskId; + yield return task; } } + + await uploadLargePayloads.ConfigureAwait(false); } /// From 58dff06ef24976ca77e490e3c42b12601c3caf64 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Sat, 8 Jun 2024 00:01:05 +0200 Subject: [PATCH 2/4] Remove Dataflow --- .../Common/Submitter/BaseClientSubmitter.cs | 40 +-- ...moniK.DevelopmentKit.Client.Unified.csproj | 1 - .../BlockRequest.cs => IsExternalInit.cs} | 23 +- .../Submitter/BatchUntilInactiveBlock.cs | 196 -------------- .../src/Unified/Services/Submitter/Service.cs | 252 ++++++------------ 5 files changed, 111 insertions(+), 401 deletions(-) rename Client/src/Unified/{Services/Submitter/BlockRequest.cs => IsExternalInit.cs} (53%) delete mode 100644 Client/src/Unified/Services/Submitter/BatchUntilInactiveBlock.cs diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 55fd77636..68ca22790 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -308,10 +308,12 @@ public IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable payloadsWithDependencies.ToChunks(chunkSubmitSize_) .ToAsyncEnumerable() - .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)tuple.Item1, tuple.Item2, tuple.Item3)), + .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)tuple.Item1, tuple.Item2, tuple.Item3, + (TaskOptions?)null)), maxRetries, taskOptions, - cancellationToken)); + cancellationToken)) + .Select(task => task.taskId); /// /// The method to submit several tasks with dependencies tasks. This task will wait for @@ -356,10 +358,12 @@ public IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable payloadsWithDependencies.ToChunks(chunkSubmitSize_) .ToAsyncEnumerable() - .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)null, tuple.Item1, tuple.Item2)), + .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)null, tuple.Item1, tuple.Item2, + (TaskOptions?)null)), maxRetries, taskOptions, - cancellationToken)); + cancellationToken)) + .Select(task => task.taskId); /// /// The method to submit several tasks with dependencies tasks. This task will wait for @@ -397,19 +401,20 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable /// /// return the ids of the created tasks - private async IAsyncEnumerable ChunkSubmitTasksWithDependenciesAsync(IEnumerable<(string?, byte[], IList)> payloadsWithDependencies, - int maxRetries, - TaskOptions? taskOptions = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable<(string taskId, string resultId)> ChunkSubmitTasksWithDependenciesAsync( + IEnumerable<(string?, byte[], IList, TaskOptions?)> payloadsWithDependencies, + int maxRetries = 5, + TaskOptions? taskOptions = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { using var _ = Logger.LogFunction(); - var taskProperties = new List<(Either, int, bool, IList)>(); + var taskProperties = new List<(Either, int, bool, IList, TaskOptions?)>(); var smallPayloadProperties = new List(); var largePayloadProperties = new List<(byte[], int)>(); var nbResults = 0; - foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) + foreach (var (resultId, payload, dependencies, specificTaskOptions) in payloadsWithDependencies) { Either result; if (resultId is null) @@ -438,12 +443,12 @@ private async IAsyncEnumerable ChunkSubmitTasksWithDependenciesAsync(IEn isLarge = false; } - taskProperties.Add((result, payloadIndex, isLarge, dependencies)); + taskProperties.Add((result, payloadIndex, isLarge, dependencies, specificTaskOptions)); } var uploadSmallPayloads = smallPayloadProperties.ParallelSelect(new ParallelTaskOptions(1, cancellationToken), - payload => Retry.WhileException(5, + payload => Retry.WhileException(maxRetries, 2000, async _ => { @@ -483,7 +488,7 @@ await ChannelPool.GetAsync(cancellationToken) .AsTask()) .ToListAsync(cancellationToken); - var createResultMetadata = Retry.WhileException(5, + var createResultMetadata = Retry.WhileException(maxRetries, 2000, async _ => { @@ -522,7 +527,7 @@ await ChannelPool.GetAsync(cancellationToken) { var results = await createResultMetadata.ConfigureAwait(false); - await Retry.WhileException(5, + await Retry.WhileException(maxRetries, 2000, async _ => { @@ -550,7 +555,7 @@ await resultClient.UploadResultData(SessionId.Id, var tasks = taskProperties.Select(tuple => { - var (result, payloadIndex, isLarge, dependencies) = tuple; + var (result, payloadIndex, isLarge, dependencies, specificTaskOptions) = tuple; var resultId = (string?)result ?? results[(int)result]!; var payloadId = isLarge @@ -569,6 +574,7 @@ await resultClient.UploadResultData(SessionId.Id, { resultId, }, + TaskOptions = specificTaskOptions, }; }) .AsIList(); @@ -578,7 +584,7 @@ await resultClient.UploadResultData(SessionId.Id, cancellationToken), async taskChunk => { - var response = await Retry.WhileException(5, + var response = await Retry.WhileException(maxRetries, 2000, async _ => { @@ -605,7 +611,7 @@ await resultClient.UploadResultData(SessionId.Id, typeof(RpcException)) .ConfigureAwait(false); - return response.TaskInfos.Select(task => task.TaskId); + return response.TaskInfos.Select(task => (task.TaskId, task.ExpectedOutputIds.Single())); }); await foreach (var taskChunk in taskSubmit.ConfigureAwait(false)) diff --git a/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj b/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj index d562a2425..5f6308e57 100644 --- a/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj +++ b/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj @@ -11,7 +11,6 @@ - diff --git a/Client/src/Unified/Services/Submitter/BlockRequest.cs b/Client/src/Unified/IsExternalInit.cs similarity index 53% rename from Client/src/Unified/Services/Submitter/BlockRequest.cs rename to Client/src/Unified/IsExternalInit.cs index aa47ef974..a0c89d771 100644 --- a/Client/src/Unified/Services/Submitter/BlockRequest.cs +++ b/Client/src/Unified/IsExternalInit.cs @@ -14,25 +14,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System; -using System.Threading; -using ArmoniK.Api.gRPC.V1; -using ArmoniK.DevelopmentKit.Client.Common; -using ArmoniK.DevelopmentKit.Common; +#if NETFRAMEWORK || NETSTANDARD +// This type is required to use initializers when compiling to framework +namespace System.Runtime.CompilerServices; -namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter; - -internal class BlockRequest +internal static class IsExternalInit { - public IServiceInvocationHandler Handler; - - public ArmonikPayload? Payload { get; set; } - - public SemaphoreSlim Lock { get; set; } - public Guid SubmitId { get; set; } - - public string ResultId { get; set; } - public int MaxRetries { get; set; } = 5; - public TaskOptions TaskOptions { get; set; } } +#endif diff --git a/Client/src/Unified/Services/Submitter/BatchUntilInactiveBlock.cs b/Client/src/Unified/Services/Submitter/BatchUntilInactiveBlock.cs deleted file mode 100644 index 7ade944d1..000000000 --- a/Client/src/Unified/Services/Submitter/BatchUntilInactiveBlock.cs +++ /dev/null @@ -1,196 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License") -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; - -namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter; - -/// -/// Provides a dataFlow block that batches inputs into arrays. -/// A batch is produced when the number of currently queued items becomes equal -/// to BatchSize, or when a Timeout period has elapsed after receiving the last item. -/// -public class BatchUntilInactiveBlock : IPropagatorBlock, IReceivableSourceBlock -{ - private readonly ExecutionDataflowBlockOptions executionDataFlowBlockOptions_; - private readonly BatchBlock source_; - private readonly TransformBlock timeoutTransformBlock_; - private readonly Timer timer_; - - /// - /// The buffer construct base on the number of request in the buffer - /// Be aware that buffer should be T sized for network B/W - /// - /// - /// Time out before the next submit call - /// - /// Parameters to control execution for each block in pipeline - /// Options to configure message. - /// https://learn.microsoft.com/fr-fr/dotnet/api/system.threading.tasks.dataflow.executiondataflowblockoptions?view=net-6.0 - /// - public BatchUntilInactiveBlock(int bufferRequestsSize, - TimeSpan timeout, - ExecutionDataflowBlockOptions? executionDataFlowBlockOptions = null) - { - executionDataFlowBlockOptions_ = executionDataFlowBlockOptions ?? new ExecutionDataflowBlockOptions - { - BoundedCapacity = 1, - MaxDegreeOfParallelism = 1, - EnsureOrdered = true, - }; - - source_ = new BatchBlock(bufferRequestsSize, - new GroupingDataflowBlockOptions - { - BoundedCapacity = bufferRequestsSize, - EnsureOrdered = true, - }); - - timer_ = new Timer(_ => - { - source_.TriggerBatch(); - }, - null, - timeout, - System.Threading.Timeout.InfiniteTimeSpan); - - timeoutTransformBlock_ = new TransformBlock(value => - { - timer_.Change(timeout, - System.Threading.Timeout.InfiniteTimeSpan); - - return value; - }, - executionDataFlowBlockOptions_); - - source_.LinkTo(timeoutTransformBlock_, - new DataflowLinkOptions - { - PropagateCompletion = true, - }); - - Timeout = timeout; - } - - /// - /// Simple Getter to return size of batch in the pipeline - /// - public int BatchSize - => source_.BatchSize; - - /// - /// Return the TimeSpan timer set in the constructor - /// - private TimeSpan Timeout { get; } - - /// - public Task Completion - => source_.Completion; - - - /// - public void Complete() - => source_.Complete(); - - /// - public void Fault(Exception exception) - => ((IDataflowBlock)source_).Fault(exception); - - - /// - public IDisposable LinkTo(ITargetBlock target, - DataflowLinkOptions linkOptions) - => timeoutTransformBlock_.LinkTo(target, - linkOptions); - - - /// - public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, - T messageValue, - ISourceBlock source, - bool consumeToAccept) - { - var offerResult = ((ITargetBlock)source_).OfferMessage(messageHeader, - messageValue, - source, - consumeToAccept); - - if (offerResult == DataflowMessageStatus.Accepted) - { - timer_.Change(Timeout, - System.Threading.Timeout.InfiniteTimeSpan); - } - - return offerResult; - } - - /// - public T[] ConsumeMessage(DataflowMessageHeader messageHeader, - ITargetBlock target, - out bool messageConsumed) - => ((ISourceBlock)source_).ConsumeMessage(messageHeader, - target, - out messageConsumed); - - /// - public bool ReserveMessage(DataflowMessageHeader messageHeader, - ITargetBlock target) - => ((ISourceBlock)source_).ReserveMessage(messageHeader, - target); - - /// - public void ReleaseReservation(DataflowMessageHeader messageHeader, - ITargetBlock target) - => ((ISourceBlock)source_).ReleaseReservation(messageHeader, - target); - - /// - public bool TryReceive(Predicate filter, - out T[] item) - => source_.TryReceive(filter, - out item); - - /// - public bool TryReceiveAll(out IList items) - => source_.TryReceiveAll(out items); - - /// - /// Create an ActionBlock with a delegated function to execute - /// at the end of pipeline - /// - /// the method to call - public void ExecuteAsync(Action action) - { - var actBlock = new ActionBlock(action, - executionDataFlowBlockOptions_); - - timeoutTransformBlock_.LinkTo(actBlock, - new DataflowLinkOptions - { - PropagateCompletion = true, - }); - } - - /// - /// Trigger the batch even if it doesn't criteria to submit - /// - public void TriggerBatch() - => source_.TriggerBatch(); -} diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index 8dfa34c4a..f5ce06136 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -19,8 +19,8 @@ using System.IO; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; using ArmoniK.Api.Common.Utils; using ArmoniK.Api.gRPC.V1; @@ -52,11 +52,6 @@ namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter; [MarkDownDoc] public class Service : AbstractClientService, ISubmitterService { - private readonly RequestTaskMap requestTaskMap_ = new(); - - - private readonly SemaphoreSlim semaphoreSlim_; - /// /// The default constructor to open connection with the control plane /// and create the session to ArmoniK @@ -67,156 +62,69 @@ public Service(Properties properties, ILoggerFactory? loggerFactory = null) : base(loggerFactory) { - var timeOutSending = properties.TimeTriggerBuffer ?? TimeSpan.FromSeconds(1); - - var maxTasksPerBuffer = properties.MaxTasksPerBuffer; + SessionServiceFactory = new SessionServiceFactory(LoggerFactory); + SessionService = SessionServiceFactory.CreateSession(properties); + CancellationResultTaskSource = new CancellationTokenSource(); + Logger = LoggerFactory.CreateLogger(); + Logger.BeginPropertyScope(("SessionId", SessionService.SessionId), + ("Class", "Service")); - semaphoreSlim_ = new SemaphoreSlim(properties.MaxConcurrentBuffers * maxTasksPerBuffer); + var submitChannel = Channel.CreateUnbounded(); + SubmitChannel = submitChannel.Writer; + + var cancellationToken = CancellationResultTaskSource.Token; + var requests = submitChannel.Reader.ToAsyncEnumerable(cancellationToken); + + SubmitTask = Task.Run(() => requests.ToChunksAsync(properties.MaxTasksPerBuffer, + properties.TimeTriggerBuffer ?? TimeSpan.FromSeconds(1), + cancellationToken) + .ParallelForEach(new ParallelTaskOptions(properties.MaxConcurrentBuffers, + cancellationToken), + async chunk => + { + var taskRequests = chunk.Select(req => ((string?)null, req.Payload, req.Dependencies, req.TaskOptions)); + var response = SessionService.ChunkSubmitTasksWithDependenciesAsync(taskRequests, + cancellationToken: cancellationToken); + + List<(string taskId, string resultId)> tasks; + try + { + tasks = await response.ToListAsync(cancellationToken) + .ConfigureAwait(false); + } + catch (Exception e) + { + foreach (var req in chunk) + { + req.Tcs.SetException(e); + } - SessionServiceFactory = new SessionServiceFactory(LoggerFactory); + return; + } - SessionService = SessionServiceFactory.CreateSession(properties); + foreach (var (task, submission) in tasks.Zip(chunk, + (s, + submission) => (s, submission))) + { + ResultHandlerDictionary[task.taskId] = submission.Handler; + submission.Tcs.SetResult(task.taskId); + } + })); - CancellationResultTaskSource = new CancellationTokenSource(); HandlerResponse = Task.Run(ResultTask, CancellationResultTaskSource.Token); - - Logger = LoggerFactory.CreateLogger(); - Logger.BeginPropertyScope(("SessionId", SessionService.SessionId), - ("Class", "Service")); - - BufferSubmit = new BatchUntilInactiveBlock(maxTasksPerBuffer, - timeOutSending, - new ExecutionDataflowBlockOptions - { - BoundedCapacity = properties.MaxParallelChannels, - MaxDegreeOfParallelism = properties.MaxParallelChannels, - }); - - BufferSubmit.ExecuteAsync(blockRequests => - { - var blockRequestList = blockRequests.ToList(); - - try - { - if (blockRequestList.Count == 0) - { - return; - } - - Logger.LogInformation("Submitting buffer of {count} task...", - blockRequestList.Count); - var query = blockRequestList.GroupBy(blockRequest => blockRequest.TaskOptions); - - foreach (var groupBlockRequest in query) - { - var maxRetries = groupBlockRequest.First() - .MaxRetries; - //Generate resultId - var resultsIds = SessionService.CreateResultsMetadata(groupBlockRequest.Select(_ => Guid.NewGuid() - .ToString())) - .Values.ToList(); - - foreach (var (request, index) in groupBlockRequest.Select((r, - i) => (r, i))) - { - request.ResultId = resultsIds[index]; - } - - var currentBackoff = properties.RetryInitialBackoff; - for (var retry = 0; retry < maxRetries; retry++) - { - try - { - var taskIds = - SessionService.SubmitTasksWithDependencies(groupBlockRequest.Select(x => new Tuple>(x.ResultId, - x.Payload! - .Serialize(), - Array - .Empty< - string>())), - 1, - groupBlockRequest.First() - .TaskOptions); - - - var ids = taskIds.ToList(); - var mapTaskResults = SessionService.GetResultIds(ids); - var taskIdsResultIds = mapTaskResults.ToDictionary(result => result.ResultIds.Single(), - result => result.TaskId); - - - foreach (var pairTaskIdResultId in taskIdsResultIds) - { - var blockRequest = groupBlockRequest.FirstOrDefault(x => x.ResultId == pairTaskIdResultId.Key) ?? - throw new InvalidOperationException($"Cannot find BlockRequest with result id {pairTaskIdResultId.Value}"); - - ResultHandlerDictionary[pairTaskIdResultId.Value] = blockRequest.Handler; - - requestTaskMap_.PutResponse(blockRequest.SubmitId, - pairTaskIdResultId.Value); - } - - if (ids.Count > taskIdsResultIds.Count) - { - Logger.LogWarning("Fail to submit all tasks at once, retry with missing tasks"); - - throw new Exception("Fail to submit all tasks at once. Retrying..."); - } - - break; - } - catch (Exception e) - { - if (retry >= maxRetries - 1) - { - Logger.LogError(e, - "Fail to retry {count} times of submission. Stop trying to submit", - maxRetries); - throw; - } - - Logger?.LogWarning(e, - "Fail to submit, {retry}/{maxRetries} retrying", - retry + 1, - maxRetries); - - //Delay before submission - Task.Delay(currentBackoff) - .Wait(); - currentBackoff = TimeSpan.FromSeconds(Math.Min(currentBackoff.TotalSeconds * properties.RetryBackoffMultiplier, - properties.RetryMaxBackoff.TotalSeconds)); - } - } - - foreach (var blockRequest in groupBlockRequest) - { - blockRequest.Lock.Release(); - } - } - } - catch (Exception e) - { - Logger.LogError(e, - "Fail to submit buffer with {count} tasks inside", - blockRequestList.Count); - - requestTaskMap_.BufferFailures(blockRequestList.Select(block => block.SubmitId), - e); - } - }); } + private Task SubmitTask { get; } + private ChannelWriter SubmitChannel { get; } + /// /// Property Get the SessionId /// [PublicAPI] public SessionService SessionService { get; } - - private BatchUntilInactiveBlock BufferSubmit { get; } - private ILogger Logger { get; } private SessionServiceFactory SessionServiceFactory { get; } @@ -299,9 +207,22 @@ public async Task SubmitAsync(string methodName, public override void Dispose() { CancellationResultTaskSource.Cancel(); - HandlerResponse.Wait(); - HandlerResponse.Dispose(); - semaphoreSlim_.Dispose(); + + foreach (var awaitable in new[] + { + HandlerResponse, + SubmitTask, + }) + { + try + { + awaitable.WaitSync(); + awaitable.Dispose(); + } + catch (OperationCanceledException) + { + } + } GC.SuppressFinalize(this); } @@ -365,31 +286,18 @@ private async Task SubmitAsync(string methodName, bool serializedArguments, CancellationToken token) { - await semaphoreSlim_.WaitAsync(token); - - return await SubmitAsync(new BlockRequest - { - SubmitId = Guid.NewGuid(), - Payload = new ArmonikPayload(methodName, - argument, - serializedArguments), - Handler = handler, - MaxRetries = maxRetries, - TaskOptions = taskOptions ?? SessionService.TaskOptions, - Lock = semaphoreSlim_, - }, - token) - .ConfigureAwait(false); - } - - private async Task SubmitAsync(BlockRequest blockRequest, - CancellationToken token = default) - { - await BufferSubmit.SendAsync(blockRequest, - token) - .ConfigureAwait(false); - - return await requestTaskMap_.GetResponseAsync(blockRequest.SubmitId); + var tcs = new TaskCompletionSource(); + await SubmitChannel.WriteAsync(new TaskSubmission(new ArmonikPayload(methodName, + argument, + serializedArguments).Serialize(), + Array.Empty(), + taskOptions, + handler, + tcs), + token) + .ConfigureAwait(false); + + return await tcs.Task.ConfigureAwait(false); } /// @@ -800,6 +708,12 @@ private void ResultTask() public ObjectPool GetChannelPool() => SessionService.ChannelPool; + private record TaskSubmission(byte[] Payload, + IList Dependencies, + TaskOptions? TaskOptions, + IServiceInvocationHandler Handler, + TaskCompletionSource? Tcs); + /// /// Class to return TaskId and the result /// From 035e12fd6681cf76af466beccc63b8f28996717f Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Mon, 10 Jun 2024 23:49:05 +0200 Subject: [PATCH 3/4] configurable parallelism --- Client/src/Common/Submitter/BaseClientSubmitter.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 68ca22790..525ff0dcb 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -446,7 +446,7 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable Retry.WhileException(maxRetries, 2000, @@ -521,7 +521,7 @@ await ChannelPool.GetAsync(cancellationToken) .AsTask(); - var uploadLargePayloads = largePayloadProperties.ParallelForEach(new ParallelTaskOptions(1, + var uploadLargePayloads = largePayloadProperties.ParallelForEach(new ParallelTaskOptions(properties_.MaxParallelChannels, cancellationToken), async payload => { @@ -779,7 +779,9 @@ public async ValueTask GetResultStatusAsync(IEnumerable< nameof(Results.ResultsClient.GetResult)); return await result2TaskDic.Keys.ToChunks(100) - .ParallelSelect(async chunk => + .ParallelSelect(new ParallelTaskOptions(properties_.MaxParallelChannels, + cancellationToken), + async chunk => { await using var channel = await ChannelPool.GetAsync(cancellationToken) .ConfigureAwait(false); From 0432dae12451ac8a2536ec59c78827fce507e5c0 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Mon, 17 Jun 2024 16:04:01 +0200 Subject: [PATCH 4/4] Add logs --- Client/src/Unified/Services/Submitter/Service.cs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index f5ce06136..a1130831b 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -82,18 +82,23 @@ public Service(Properties properties, cancellationToken), async chunk => { - var taskRequests = chunk.Select(req => ((string?)null, req.Payload, req.Dependencies, req.TaskOptions)); - var response = SessionService.ChunkSubmitTasksWithDependenciesAsync(taskRequests, - cancellationToken: cancellationToken); + Logger?.LogInformation("Submitting batch of {NbTask}/{MaxTask}", + chunk.Length, + properties.MaxTasksPerBuffer); List<(string taskId, string resultId)> tasks; try { + var taskRequests = chunk.Select(req => ((string?)null, req.Payload, req.Dependencies, req.TaskOptions)); + var response = SessionService.ChunkSubmitTasksWithDependenciesAsync(taskRequests, + cancellationToken: cancellationToken); tasks = await response.ToListAsync(cancellationToken) .ConfigureAwait(false); } catch (Exception e) { + Logger?.LogError(e, + "Error while submitting tasks"); foreach (var req in chunk) { req.Tcs.SetException(e);