From d23e25e1a4c7e5fb773a4a874125cc7e3d67b200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Mon, 19 Aug 2024 17:49:43 +0200 Subject: [PATCH] perf: improve wait for results performances by reducing the size of the request and making multiple at the same time --- .../ArmoniK.Api.Client/EventsClientExt.cs | 141 ++++++++++-------- 1 file changed, 76 insertions(+), 65 deletions(-) diff --git a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs index 367675bfa..10f890134 100644 --- a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs +++ b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs @@ -32,6 +32,7 @@ using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Events; using ArmoniK.Api.gRPC.V1.Results; +using ArmoniK.Utils; using Grpc.Core; @@ -80,75 +81,85 @@ private static FiltersAnd ResultsFilter(string resultId) public static async Task WaitForResultsAsync(this Events.EventsClient client, string sessionId, ICollection resultIds, - CancellationToken cancellationToken) - { - var resultsNotFound = new HashSet(resultIds); - while (resultsNotFound.Any()) - { - using var streamingCall = client.GetEvents(new EventSubscriptionRequest + CancellationToken cancellationToken = default, + int bucket_size = 100, + int parallelism = 1) + => await resultIds.ToChunks(bucket_size) + .ParallelForEach(new ParallelTaskOptions + { + ParallelismLimit = parallelism, + }, + async results => + { + var resultsNotFound = new HashSet(results); + while (resultsNotFound.Any()) + { + using var streamingCall = client.GetEvents(new EventSubscriptionRequest + { + SessionId = sessionId, + ReturnedEvents = + { + EventsEnum.ResultStatusUpdate, + EventsEnum.NewResult, + }, + ResultsFilters = new Filters + { + Or = + { + resultsNotFound.Select(ResultsFilter), + }, + }, + }, + cancellationToken: cancellationToken); + try + { + while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) + { + var resp = streamingCall.ResponseStream.Current; + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && + resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId)) + { + if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) { - SessionId = sessionId, - ReturnedEvents = + resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); + if (!resultsNotFound.Any()) { - EventsEnum.ResultStatusUpdate, - EventsEnum.NewResult, - }, - ResultsFilters = new Filters - { - Or = - { - resultsNotFound.Select(ResultsFilter), - }, - }, - }, - cancellationToken: cancellationToken); - try - { - while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) - { - var resp = streamingCall.ResponseStream.Current; - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId)) - { - if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) - { - resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); - if (!resultsNotFound.Any()) - { - break; - } - } + break; + } + } - if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) - { - throw new ResultAbortedException($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); - } - } + if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) + { + throw new ResultAbortedException($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); + } + } - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId)) - { - if (resp.NewResult.Status == ResultStatus.Completed) - { - resultsNotFound.Remove(resp.NewResult.ResultId); - if (!resultsNotFound.Any()) - { - break; - } - } + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && + resultsNotFound.Contains(resp.NewResult.ResultId)) + { + if (resp.NewResult.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.NewResult.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } - if (resp.NewResult.Status == ResultStatus.Aborted) - { - throw new ResultAbortedException($"Result {resp.NewResult.ResultId} has been aborted"); - } - } - } - } - catch (OperationCanceledException) - { - } - catch (RpcException) - { - } - } - } + if (resp.NewResult.Status == ResultStatus.Aborted) + { + throw new ResultAbortedException($"Result {resp.NewResult.ResultId} has been aborted"); + } + } + } + } + catch (OperationCanceledException) + { + } + catch (RpcException) + { + } + } + }); } }