Skip to content

Commit

Permalink
feat: more async (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo authored Jun 12, 2024
2 parents 00fa4ed + 0c7d21f commit 28b7ee5
Show file tree
Hide file tree
Showing 14 changed files with 1,565 additions and 757 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ on:
- main
- "[0-9]+.[0-9]+.x"
pull_request:
branches:
- main
- "[0-9]+.[0-9]+.x"


jobs:
versionning:
Expand Down
1,102 changes: 754 additions & 348 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs

Large diffs are not rendered by default.

188 changes: 0 additions & 188 deletions Client/src/Common/Submitter/ChannelPool.cs

This file was deleted.

38 changes: 34 additions & 4 deletions Client/src/Common/Submitter/ClientServiceConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Threading.Tasks;

using ArmoniK.Api.Client.Options;
using ArmoniK.Api.Client.Submitter;
using ArmoniK.Utils;

using Grpc.Net.Client;

using Microsoft.Extensions.Logging;

Expand All @@ -33,8 +38,8 @@ public class ClientServiceConnector
/// <param name="properties">Configuration Properties</param>
/// <param name="loggerFactory">Optional logger factory</param>
/// <returns>The connection pool</returns>
public static ChannelPool ControlPlaneConnectionPool(Properties properties,
ILoggerFactory? loggerFactory = null)
public static ObjectPool<GrpcChannel> ControlPlaneConnectionPool(Properties properties,
ILoggerFactory? loggerFactory = null)
{
var options = new GrpcClient
{
Expand All @@ -52,7 +57,32 @@ public static ChannelPool ControlPlaneConnectionPool(Properties properties,
ProxyPassword = properties.ProxyPassword,
};

return new ChannelPool(() => GrpcChannelFactory.CreateChannel(options,
loggerFactory?.CreateLogger(typeof(ClientServiceConnector))));
return new ObjectPool<GrpcChannel>(ct => new ValueTask<GrpcChannel>(GrpcChannelFactory.CreateChannel(options,
loggerFactory?.CreateLogger(typeof(ClientServiceConnector)))),


#if NET5_0_OR_GREATER
async (channel, ct) =>
{
switch (channel.State)
{
case ConnectivityState.TransientFailure:
await channel.ShutdownAsync()
.ConfigureAwait(false);
return false;
case ConnectivityState.Shutdown:
return false;
case ConnectivityState.Idle:
case ConnectivityState.Connecting:
case ConnectivityState.Ready:
default:
return true;
}
}
#else
(_,
_) => new ValueTask<bool>(true)
#endif
);
}
}
48 changes: 36 additions & 12 deletions Client/src/Common/Submitter/TasksClientExt.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-$CURRENT_YEAR$. All rights reserved.
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,8 @@

using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;

using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Tasks;
Expand Down Expand Up @@ -133,21 +135,25 @@ public static FiltersAnd TaskStatusFilter(TaskStatus status,
/// <param name="filters"> filters to apply on the tasks </param>
/// <param name="sort"> sorting order </param>
/// <param name="pageSize"> page size </param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IEnumerable<TaskSummary> ListTasks(this Tasks.TasksClient tasksClient,
Filters filters,
ListTasksRequest.Types.Sort sort,
int pageSize = 50)
public static async IAsyncEnumerable<TaskSummary> ListTasksAsync(this Tasks.TasksClient tasksClient,
Filters filters,
ListTasksRequest.Types.Sort sort,
int pageSize = 50,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var page = 0;
ListTasksResponse res;
while ((res = tasksClient.ListTasks(new ListTasksRequest
{
Filters = filters,
Sort = sort,
PageSize = pageSize,
Page = page,
})).Tasks.Any())
while ((res = await tasksClient.ListTasksAsync(new ListTasksRequest
{
Filters = filters,
Sort = sort,
PageSize = pageSize,
Page = page,
},
cancellationToken: cancellationToken)
.ConfigureAwait(false)).Tasks.Any())
{
foreach (var taskSummary in res.Tasks)
{
Expand All @@ -157,4 +163,22 @@ public static IEnumerable<TaskSummary> ListTasks(this Tasks.TasksClient tas
page++;
}
}

/// <summary>
/// List tasks while handling page size
/// </summary>
/// <param name="tasksClient"> the tasks client </param>
/// <param name="filters"> filters to apply on the tasks </param>
/// <param name="sort"> sorting order </param>
/// <param name="pageSize"> page size </param>
/// <returns></returns>
public static IEnumerable<TaskSummary> ListTasks(this Tasks.TasksClient tasksClient,
Filters filters,
ListTasksRequest.Types.Sort sort,
int pageSize = 50)
=> ListTasksAsync(tasksClient,
filters,
sort,
pageSize)
.ToEnumerable();
}
5 changes: 4 additions & 1 deletion Client/src/Symphony/ArmonikSymphonyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
using ArmoniK.DevelopmentKit.Client.Common;
using ArmoniK.DevelopmentKit.Client.Common.Submitter;
using ArmoniK.DevelopmentKit.Common;
using ArmoniK.Utils;

using Grpc.Net.Client;

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -63,7 +66,7 @@ public ArmonikSymphonyClient(IConfiguration configuration,
/// </summary>
public string SectionGrpc { get; set; } = "Grpc";

private ChannelPool GrpcPool { get; set; }
private ObjectPool<GrpcChannel> GrpcPool { get; set; }


private IConfiguration Configuration { get; }
Expand Down
Loading

0 comments on commit 28b7ee5

Please sign in to comment.