Skip to content

Commit

Permalink
perf: improve wait for results performances by reducing the size of t…
Browse files Browse the repository at this point in the history
…he request and making multiple at the same time
  • Loading branch information
aneojgurhem committed Aug 19, 2024
1 parent 3f2c13f commit d23e25e
Showing 1 changed file with 76 additions and 65 deletions.
141 changes: 76 additions & 65 deletions packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Events;
using ArmoniK.Api.gRPC.V1.Results;
using ArmoniK.Utils;

using Grpc.Core;

Expand Down Expand Up @@ -80,75 +81,85 @@ private static FiltersAnd ResultsFilter(string resultId)
public static async Task WaitForResultsAsync(this Events.EventsClient client,
string sessionId,
ICollection<string> resultIds,
CancellationToken cancellationToken)
{
var resultsNotFound = new HashSet<string>(resultIds);
while (resultsNotFound.Any())
{
using var streamingCall = client.GetEvents(new EventSubscriptionRequest
CancellationToken cancellationToken = default,
int bucket_size = 100,

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2019, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2019, windows-x64, GrpcWebHandler)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2022, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2022, windows-x64, GrpcWebHandler)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2019, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2019, windows-x64, GrpcWebHandler)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2022, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2022, windows-x64, GrpcWebHandler)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (6, net6.0, ubuntu-latest, linux-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (6, net6.0, windows-2019, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (6, net6.0, windows-2022, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (8, net8.0, ubuntu-latest, linux-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (8, net8.0, windows-2019, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (8, net8.0, windows-2022, windows-x64)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Release C# Packages (packages/csharp/ArmoniK.Api.Client/ArmoniK.Api.Client.csproj)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 85 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Release C# Packages (packages/csharp/ArmoniK.Api.Client/ArmoniK.Api.Client.csproj)

Parameter 'bucket_size' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)
int parallelism = 1)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2019, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2019, windows-x64, GrpcWebHandler)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2022, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.7, windows-2022, windows-x64, GrpcWebHandler)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2019, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2019, windows-x64, GrpcWebHandler)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2022, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (net4.8, windows-2022, windows-x64, GrpcWebHandler)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (6, net6.0, ubuntu-latest, linux-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (6, net6.0, windows-2019, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (6, net6.0, windows-2022, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (8, net8.0, ubuntu-latest, linux-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (8, net8.0, windows-2019, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Test C# (8, net8.0, windows-2022, windows-x64)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Release C# Packages (packages/csharp/ArmoniK.Api.Client/ArmoniK.Api.Client.csproj)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)

Check warning on line 86 in packages/csharp/ArmoniK.Api.Client/EventsClientExt.cs

View workflow job for this annotation

GitHub Actions / Release C# Packages (packages/csharp/ArmoniK.Api.Client/ArmoniK.Api.Client.csproj)

Parameter 'parallelism' has no matching param tag in the XML comment for 'EventsClientExt.WaitForResultsAsync(Events.EventsClient, string, ICollection<string>, CancellationToken, int, int)' (but other parameters do)
=> await resultIds.ToChunks(bucket_size)
.ParallelForEach(new ParallelTaskOptions
{
ParallelismLimit = parallelism,
},
async results =>
{
var resultsNotFound = new HashSet<string>(results);
while (resultsNotFound.Any())
{
using var streamingCall = client.GetEvents(new EventSubscriptionRequest
{
SessionId = sessionId,
ReturnedEvents =
{
EventsEnum.ResultStatusUpdate,
EventsEnum.NewResult,
},
ResultsFilters = new Filters
{
Or =
{
resultsNotFound.Select(ResultsFilter),
},
},
},
cancellationToken: cancellationToken);
try
{
while (await streamingCall.ResponseStream.MoveNext(cancellationToken))
{
var resp = streamingCall.ResponseStream.Current;
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate &&
resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId))
{
if (resp.ResultStatusUpdate.Status == ResultStatus.Completed)
{
SessionId = sessionId,
ReturnedEvents =
resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId);
if (!resultsNotFound.Any())
{
EventsEnum.ResultStatusUpdate,
EventsEnum.NewResult,
},
ResultsFilters = new Filters
{
Or =
{
resultsNotFound.Select(ResultsFilter),
},
},
},
cancellationToken: cancellationToken);
try
{
while (await streamingCall.ResponseStream.MoveNext(cancellationToken))
{
var resp = streamingCall.ResponseStream.Current;
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.ResultStatusUpdate && resultsNotFound.Contains(resp.ResultStatusUpdate.ResultId))
{
if (resp.ResultStatusUpdate.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.ResultStatusUpdate.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}
break;
}
}

if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted)
{
throw new ResultAbortedException($"Result {resp.ResultStatusUpdate.ResultId} has been aborted");
}
}
if (resp.ResultStatusUpdate.Status == ResultStatus.Aborted)
{
throw new ResultAbortedException($"Result {resp.ResultStatusUpdate.ResultId} has been aborted");
}
}

if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult && resultsNotFound.Contains(resp.NewResult.ResultId))
{
if (resp.NewResult.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.NewResult.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}
if (resp.UpdateCase == EventSubscriptionResponse.UpdateOneofCase.NewResult &&
resultsNotFound.Contains(resp.NewResult.ResultId))
{
if (resp.NewResult.Status == ResultStatus.Completed)
{
resultsNotFound.Remove(resp.NewResult.ResultId);
if (!resultsNotFound.Any())
{
break;
}
}

if (resp.NewResult.Status == ResultStatus.Aborted)
{
throw new ResultAbortedException($"Result {resp.NewResult.ResultId} has been aborted");
}
}
}
}
catch (OperationCanceledException)
{
}
catch (RpcException)
{
}
}
}
if (resp.NewResult.Status == ResultStatus.Aborted)
{
throw new ResultAbortedException($"Result {resp.NewResult.ResultId} has been aborted");
}
}
}
}
catch (OperationCanceledException)
{
}
catch (RpcException)
{
}
}
});
}
}

0 comments on commit d23e25e

Please sign in to comment.