From 961fd1cc44e1cc3e493719394107b26b0159eee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Wed, 14 Feb 2024 09:50:35 +0100 Subject: [PATCH] refactor: improve events client extension to wait for result --- .../ArmoniK.Api.Client/EventsClientExt.cs | 101 ++++++++++-------- .../Exceptions/ResultAbortedException.cs | 42 ++++++++ 2 files changed, 99 insertions(+), 44 deletions(-) create mode 100644 packages/csharp/ArmoniK.Api.Common/Exceptions/ResultAbortedException.cs diff --git a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs index e296a92da..367675bfa 100644 --- a/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs +++ b/packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // W. Kirschenmann // J. Gurhem // D. Dubuc @@ -28,10 +28,13 @@ using System.Threading; using System.Threading.Tasks; +using ArmoniK.Api.Common.Exceptions; using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Events; using ArmoniK.Api.gRPC.V1.Results; +using Grpc.Core; + using JetBrains.Annotations; namespace ArmoniK.Api.Client @@ -80,61 +83,71 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client, CancellationToken cancellationToken) { var resultsNotFound = new HashSet(resultIds); - - using var streamingCall = client.GetEvents(new EventSubscriptionRequest - { - SessionId = sessionId, - ReturnedEvents = + while (resultsNotFound.Any()) + { + using var streamingCall = client.GetEvents(new EventSubscriptionRequest { - EventsEnum.ResultStatusUpdate, - EventsEnum.NewResult, - }, - ResultsFilters = new Filters - { - Or = + SessionId = sessionId, + ReturnedEvents = + { + EventsEnum.ResultStatusUpdate, + EventsEnum.NewResult, + }, + ResultsFilters = new Filters { - resultIds.Select(ResultsFilter), + Or = + { + resultsNotFound.Select(ResultsFilter), + }, }, - }, - }); - - while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) - { - cancellationToken.ThrowIfCancellationRequested(); - var resp = streamingCall.ResponseStream.Current; - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId)) + }, + cancellationToken: cancellationToken); + try { - if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) + while (await streamingCall.ResponseStream.MoveNext(cancellationToken)) { - resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); - if (!resultsNotFound.Any()) + var resp = streamingCall.ResponseStream.Current; + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId)) { - break; - } - } + if (resp.ResultStatusUpdate.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } - if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) - { - throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); - } - } + if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted) + { + throw new ResultAbortedException($"Result {resp.ResultStatusUpdate.ResultId} has been aborted"); + } + } - if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId)) - { - if (resp.NewResult.Status == ResultStatus.Completed) - { - resultsNotFound.Remove(resp.NewResult.ResultId); - if (!resultsNotFound.Any()) + if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId)) { - break; - } - } + if (resp.NewResult.Status == ResultStatus.Completed) + { + resultsNotFound.Remove(resp.NewResult.ResultId); + if (!resultsNotFound.Any()) + { + break; + } + } - if (resp.NewResult.Status == ResultStatus.Aborted) - { - throw new Exception($"Result {resp.NewResult.ResultId} has been aborted"); + if (resp.NewResult.Status == ResultStatus.Aborted) + { + throw new ResultAbortedException($"Result {resp.NewResult.ResultId} has been aborted"); + } + } } } + catch (OperationCanceledException) + { + } + catch (RpcException) + { + } } } } diff --git a/packages/csharp/ArmoniK.Api.Common/Exceptions/ResultAbortedException.cs b/packages/csharp/ArmoniK.Api.Common/Exceptions/ResultAbortedException.cs new file mode 100644 index 000000000..d5876143e --- /dev/null +++ b/packages/csharp/ArmoniK.Api.Common/Exceptions/ResultAbortedException.cs @@ -0,0 +1,42 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2024. All rights reserved. +// W. Kirschenmann +// J. Gurhem +// D. Dubuc +// L. Ziane Khodja +// F. Lemaitre +// S. Djebbar +// J. Fonseca +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; + +namespace ArmoniK.Api.Common.Exceptions; + +/// +/// Exception raised when results are aborted +/// +public class ResultAbortedException : Exception +{ + /// + /// Initializes a new instance of the with the specified error message + /// + /// The error message + public ResultAbortedException(string message) + : base(message) + { + } +}