diff --git a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs new file mode 100644 index 000000000..e296a92da --- /dev/null +++ b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs @@ -0,0 +1,141 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// W. Kirschenmann +// J. Gurhem +// D. Dubuc +// L. Ziane Khodja +// F. Lemaitre +// S. Djebbar +// J. Fonseca +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1; +using ArmoniK.Api.gRPC.V1.Events; +using ArmoniK.Api.gRPC.V1.Results; + +using JetBrains.Annotations; + +namespace ArmoniK.Api.Client +{ + /// + /// extensions methods + /// + [PublicAPI] + public static class EventsClientExt + { + private static FiltersAnd ResultsFilter(string resultId) + => new() + { + And = + { + new FilterField + { + Field = new ResultField + { + ResultRawField = new ResultRawField + { + Field = ResultRawEnumField.ResultId, + }, + }, + FilterString = new FilterString + { + Operator = FilterStringOperator.Equal, + Value = resultId, + }, + }, + }, + }; + + /// + /// Wait until the given results are completed + /// + /// gRPC result client + /// The session ID in which the results are located + /// A collection of results to wait for + /// Token used to cancel the execution of the method + /// if a result is aborted + [PublicAPI] + public static async Task WaitForResultsAsync(this Events.EventsClient client, + string sessionId, + ICollection resultIds, + CancellationToken cancellationToken) + { + var resultsNotFound = new HashSet(resultIds); + + using var streamingCall = client.GetEvents(new EventSubscriptionRequest + { + SessionId = sessionId, + ReturnedEvents = + { + EventsEnum.ResultStatusUpdate, + EventsEnum.NewResult, + }, + ResultsFilters = new Filters + { + Or = + { + resultIds.Select(ResultsFilter), + }, + }, + }); + + while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) + { + cancellationToken.ThrowIfCancellationRequested(); + var resp = streamingCall.ResponseStream.Current; + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId)) + { + if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } + + if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); + } + } + + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId)) + { + if (resp.NewResult.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.NewResult.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } + + if (resp.NewResult.Status == ResultStatus.Aborted) + { + throw new Exception($"Result {resp.NewResult.ResultId} has been aborted"); + } + } + } + } + } +}