Skip to content

Commit

Permalink
refactor: improve events client extension to wait for result (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem authored Feb 14, 2024
2 parents 8a250eb + 961fd1c commit 46ef2b9
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 44 deletions.
101 changes: 57 additions & 44 deletions packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2023. All rights reserved.
// Copyright (C) ANEO, 2021-2024. All rights reserved.
// W. Kirschenmann <[email protected]>
// J. Gurhem <[email protected]>
// D. Dubuc <[email protected]>
Expand Down Expand Up @@ -28,10 +28,13 @@
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Exceptions;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Events;
using ArmoniK.Api.gRPC.V1.Results;

using Grpc.Core;

using JetBrains.Annotations;

namespace ArmoniK.Api.Client
Expand Down Expand Up @@ -80,61 +83,71 @@ public static async Task WaitForResultsAsync(this Events.EventsClient client,
CancellationToken cancellationToken)
{
var resultsNotFound = new HashSet<string>(resultIds);

using var streamingCall = client.GetEvents(new EventSubscriptionRequest
{
SessionId = sessionId,
ReturnedEvents =
while (resultsNotFound.Any())
{
using var streamingCall = client.GetEvents(new EventSubscriptionRequest
{
EventsEnum.ResultStatusUpdate,
EventsEnum.NewResult,
},
ResultsFilters = new Filters
{
Or =
SessionId = sessionId,
ReturnedEvents =
{
EventsEnum.ResultStatusUpdate,
EventsEnum.NewResult,
},
ResultsFilters = new Filters
{
resultIds.Select(ResultsFilter),
Or =
{
resultsNotFound.Select(ResultsFilter),
},
},
},
});

while (await streamingCall.ResponseStream.MoveNext(cancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
var resp = streamingCall.ResponseStream.Current;
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId))
},
cancellationToken: cancellationToken);
try
{
if (resp.ResultStatusUpdate.Status == ResultStatus.Completed)
while (await streamingCall.ResponseStream.MoveNext(cancellationToken))
{
resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId);
if (!resultsNotFound.Any())
var resp = streamingCall.ResponseStream.Current;
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId))
{
break;
}
}
if (resp.ResultStatusUpdate.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}

if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.ResultStatusUpdate.ResultId} has been aborted");
}
}
if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted)
{
throw new ResultAbortedException($"Result {resp.ResultStatusUpdate.ResultId} has been aborted");
}
}

if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId))
{
if (resp.NewResult.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.NewResult.ResultId);
if (!resultsNotFound.Any())
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId))
{
break;
}
}
if (resp.NewResult.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.NewResult.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}

if (resp.NewResult.Status == ResultStatus.Aborted)
{
throw new Exception($"Result {resp.NewResult.ResultId} has been aborted");
if (resp.NewResult.Status == ResultStatus.Aborted)
{
throw new ResultAbortedException($"Result {resp.NewResult.ResultId} has been aborted");
}
}
}
}
catch (OperationCanceledException)
{
}
catch (RpcException)
{
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
// W. Kirschenmann <[email protected]>
// J. Gurhem <[email protected]>
// D. Dubuc <[email protected]>
// L. Ziane Khodja <[email protected]>
// F. Lemaitre <[email protected]>
// S. Djebbar <[email protected]>
// J. Fonseca <[email protected]>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;

namespace ArmoniK.Api.Common.Exceptions;

/// <summary>
/// Exception raised when results are aborted
/// </summary>
public class ResultAbortedException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="ResultAbortedException" /> with the specified error message
/// </summary>
/// <param name="message">The error message</param>
public ResultAbortedException(string message)
: base(message)
{
}
}

0 comments on commit 46ef2b9

Please sign in to comment.