diff --git a/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/BaseClientSubmitter.cs b/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/BaseClientSubmitter.cs index 235a9a04..16b019a3 100644 --- a/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/BaseClientSubmitter.cs @@ -1,5 +1,5 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2022. // W. Kirschenmann // J. Gurhem @@ -8,13 +8,13 @@ // F. Lemaitre // S. Djebbar // J. Fonseca -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -60,17 +60,15 @@ public class BaseClientSubmitter /// /// Base Object for all Client submitter /// - /// Channel used to create grpc clients + /// Channel used to create grpc clients /// the logger factory to pass for root object /// The size of chunk to split the list of tasks - public BaseClientSubmitter(ChannelBase channelBase, + public BaseClientSubmitter(ChannelPool channelPool, [CanBeNull] ILoggerFactory loggerFactory = null, int chunkSubmitSize = 500) { + channelPool_ = channelPool; Logger = loggerFactory?.CreateLogger(); - TaskService = new Tasks.TasksClient(channelBase); - ResultService = new Results.ResultsClient(channelBase); - SubmitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channelBase); chunkSubmitSize_ = chunkSubmitSize; } @@ -85,14 +83,14 @@ public BaseClientSubmitter(ChannelBase channelBase, /// public Session SessionId { get; protected set; } - /// - /// Protects the access to gRPC service. - /// - private readonly SemaphoreSlim mutex_ = new(1); - #pragma warning restore CS1591 + /// + /// The channel pool to use for creating clients + /// + protected ChannelPool channelPool_; + /// /// The number of chunk to split the payloadsWithDependencies @@ -106,11 +104,6 @@ public BaseClientSubmitter(ChannelBase channelBase, [CanBeNull] protected ILogger Logger { get; set; } - /// - /// The submitter and receiver Service to submit, wait and get the result - /// - protected Api.gRPC.V1.Submitter.Submitter.SubmitterClient SubmitterService { get; set; } - /// /// Service for interacting with results /// @@ -140,15 +133,15 @@ public TaskStatus GetTaskStatus(string taskId) /// The list of taskIds /// public IEnumerable> GetTaskStatues(params string[] taskIds) - => mutex_.LockedExecute(() => SubmitterService.GetTaskStatus(new GetTaskStatusRequest - { - TaskIds = - { - taskIds, - }, - }) - .IdStatuses.Select(x => Tuple.Create(x.TaskId, - x.Status))); + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).GetTaskStatus(new GetTaskStatusRequest + { + TaskIds = + { + taskIds, + }, + }) + .IdStatuses.Select(x => Tuple.Create(x.TaskId, + x.Status))); /// /// Return the taskOutput when error occurred @@ -156,11 +149,11 @@ public IEnumerable> GetTaskStatues(params string[] tas /// /// public Output GetTaskOutputInfo(string taskId) - => mutex_.LockedExecute(() => SubmitterService.TryGetTaskOutput(new TaskOutputRequest - { - TaskId = taskId, - Session = SessionId.Id, - })); + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).TryGetTaskOutput(new TaskOutputRequest + { + TaskId = taskId, + Session = SessionId.Id, + })); /// @@ -199,16 +192,17 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable taskIds) { using var _ = Logger?.LogFunction(); + + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + Retry.WhileException(5, 200, retry => { Logger?.LogDebug("Try {try} for {funcName}", retry, - nameof(SubmitterService.WaitForCompletion)); + nameof(submitterService.WaitForCompletion)); - var __ = mutex_.LockedExecute(() => SubmitterService.WaitForCompletion(new WaitRequest - { - Filter = new TaskFilter - { - Task = new TaskFilter.Types.IdsRequest - { - Ids = - { - taskIds, - }, - }, - }, - StopOnFirstTaskCancellation = true, - StopOnFirstTaskError = true, - })); + var __ = submitterService.WaitForCompletion(new WaitRequest + { + Filter = new TaskFilter + { + Task = new TaskFilter.Types.IdsRequest + { + Ids = + { + taskIds, + }, + }, + }, + StopOnFirstTaskCancellation = true, + StopOnFirstTaskError = true, + }); }, true, typeof(IOException), @@ -405,21 +403,24 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, var result2TaskDic = mapTaskResults.ToDictionary(result => result.ResultIds.Single(), result => result.TaskId); + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + var idStatus = Retry.WhileException(5, 200, retry => { Logger?.LogDebug("Try {try} for {funcName}", retry, - nameof(SubmitterService.GetResultStatus)); - var resultStatusReply = mutex_.LockedExecute(() => SubmitterService.GetResultStatus(new GetResultStatusRequest - { - ResultIds = - { - result2TaskDic.Keys, - }, - SessionId = SessionId.Id, - })); + nameof(submitterService.GetResultStatus)); + var resultStatusReply = submitterService.GetResultStatus(new GetResultStatusRequest + { + ResultIds = + { + result2TaskDic.Keys, + }, + SessionId = SessionId.Id, + }); return resultStatusReply.IdStatuses; }, true, @@ -471,14 +472,14 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, } private ICollection GetResultIds(IEnumerable taskIds) - => mutex_.LockedExecute(() => TaskService.GetResultIds(new GetResultIdsRequest - { - TaskId = - { - taskIds, - }, - }) - .TaskResults); + => channelPool_.WithChannel(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest + { + TaskId = + { + taskIds, + }, + }) + .TaskResults); /// /// Try to find the result of One task. If there no result, the function return byte[0] @@ -504,15 +505,18 @@ public byte[] GetResult(string taskId, Session = SessionId.Id, }; + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + Retry.WhileException(5, 200, retry => { Logger?.LogDebug("Try {try} for {funcName}", retry, - nameof(SubmitterService.WaitForAvailability)); - var availabilityReply = mutex_.LockedExecute(() => SubmitterService.WaitForAvailability(resultRequest, - cancellationToken: cancellationToken)); + nameof(submitterService.WaitForAvailability)); + var availabilityReply = submitterService.WaitForAvailability(resultRequest, + cancellationToken: cancellationToken); switch (availabilityReply.TypeCase) { @@ -582,10 +586,11 @@ public async Task TryGetResultAsync(ResultRequest resultRequest, List> chunks; int len; - using (await mutex_.LockGuardAsync() - .ConfigureAwait(false)) + using var channel = channelPool_.GetChannel(); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + { - var streamingCall = SubmitterService.TryGetResultStream(resultRequest, + var streamingCall = submitterService.TryGetResultStream(resultRequest, cancellationToken: cancellationToken); chunks = new List>(); len = 0; @@ -669,7 +674,8 @@ public byte[] TryGetResult(string taskId, try { var response = TryGetResultAsync(resultRequest, - cancellationToken); + cancellationToken) + .Result; return response; } catch (AggregateException ex) @@ -714,7 +720,7 @@ public byte[] TryGetResult(string taskId, typeof(IOException), typeof(RpcException)); - return resultReply.Result; + return resultReply; } /// diff --git a/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/ChannelPool.cs b/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/ChannelPool.cs new file mode 100644 index 00000000..04055030 --- /dev/null +++ b/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/ChannelPool.cs @@ -0,0 +1,154 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2022. All rights reserved. +// W. Kirschenmann +// J. Gurhem +// D. Dubuc +// L. Ziane Khodja +// F. Lemaitre +// S. Djebbar +// J. Fonseca +// D. Brasseur +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System; +using System.Collections.Concurrent; + +using Grpc.Core; + +using JetBrains.Annotations; + +using Microsoft.Extensions.Logging; + +namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; + +/// +/// Helper to have a connection pool for gRPC services +/// +public sealed class ChannelPool +{ + private readonly Func channelFactory_; + + [CanBeNull] + private readonly ILogger logger_; + + private readonly ConcurrentBag pool_; + + /// + /// Constructs a new channelPool + /// + /// Function used to create new channels + /// loggerFactory used to instantiate a logger for the pool + public ChannelPool(Func channelFactory, + [CanBeNull] ILoggerFactory loggerFactory = null) + { + channelFactory_ = channelFactory; + pool_ = new ConcurrentBag(); + logger_ = loggerFactory?.CreateLogger(); + } + + /// + /// Get a channel from the pool. If the pool is empty, create a new channel + /// + /// A ChannelBase used by nobody else + private ChannelBase AcquireChannel() + { + if (pool_.TryTake(out var channel)) + { + logger_?.LogDebug("Acquired already existing channel {channel} from pool", + channel); + return channel; + } + + channel = channelFactory_(); + logger_?.LogInformation("Created and acquired new channel {channel} from pool", + channel); + return channel; + } + + /// + /// Release a ChannelBase to the pool that could be reused later by someone else + /// + /// Channel to release + private void ReleaseChannel(ChannelBase channel) + { + logger_?.LogDebug("Released channel {channel} to pool", + channel); + pool_.Add(channel); + } + + /// + /// Get a channel that will be automatically released when disposed + /// + /// + public ChannelGuard GetChannel() + => new(this); + + /// + /// Call f with an acquired channel + /// + /// Function to be called + /// Type of the return type of f + /// Value returned by f + public T WithChannel(Func f) + { + using var channel = GetChannel(); + return f(channel.Channel); + } + + /// + /// Call f with an acquired channel + /// + /// Function to be called + public void WithChannel(Action f) + { + using var channel = GetChannel(); + f(channel.Channel); + } + + /// + /// Helper class that acquires a channel from a pool when constructed, and releases it when disposed + /// + public sealed class ChannelGuard : IDisposable + { + /// + /// Channel that is used by nobody else + /// + public readonly ChannelBase Channel; + + private readonly ChannelPool pool_; + + /// + /// Acquire a channel that will be released when disposed + /// + /// + public ChannelGuard(ChannelPool channelPool) + { + pool_ = channelPool; + Channel = channelPool.AcquireChannel(); + } + + /// + public void Dispose() + => pool_.ReleaseChannel(Channel); + + /// + /// Implicit convert a ChannelGuard into a ChannelBase + /// + /// ChannelGuard + /// ChannelBase + public static implicit operator ChannelBase(ChannelGuard guard) + => guard.Channel; + } +} diff --git a/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/ClientServiceConnector.cs b/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/ClientServiceConnector.cs index a31c1df5..ab3bbbe9 100644 --- a/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/ClientServiceConnector.cs +++ b/Client/src/ArmoniK.DevelopmentKit.Client.Common/Submitter/ClientServiceConnector.cs @@ -1,19 +1,18 @@ -#if NET5_0_OR_GREATER -using Grpc.Net.Client; -using Grpc.Core; - -using System.Runtime.InteropServices; -using System.Security.Cryptography.X509Certificates; -#else -using Grpc.Core; -#endif using System; using System.IO; using System.Net.Http; +using Grpc.Core; + using JetBrains.Annotations; using Microsoft.Extensions.Logging; +#if NET5_0_OR_GREATER +using Grpc.Net.Client; + +using System.Runtime.InteropServices; +using System.Security.Cryptography.X509Certificates; +#endif namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; @@ -30,9 +29,9 @@ public class ClientServiceConnector /// Optional : Check if the ssl must have a strong validation (default true) /// Optional : the logger factory to create the logger /// - public static ChannelBase ControlPlaneConnection(string endPoint, - bool sslValidation = true, - ILoggerFactory loggerFactory = null) + private static ChannelBase ControlPlaneConnection(string endPoint, + bool sslValidation = true, + ILoggerFactory loggerFactory = null) => ControlPlaneConnection(endPoint, "", "", @@ -42,17 +41,17 @@ public static ChannelBase ControlPlaneConnection(string endPoint, /// /// Open Connection with the control plane with mTLS authentication /// - /// + /// The address and port of control plane /// The certificate filename in a pem format /// The client key filename in a pem format /// Check if the ssl must have a strong validation /// Optional logger factory /// - public static ChannelBase ControlPlaneConnection(string endPoint, - string clientCertFilename, - string clientKeyFilename, - bool sslValidation = true, - [CanBeNull] ILoggerFactory loggerFactory = null) + private static ChannelBase ControlPlaneConnection(string endPoint, + string clientCertFilename, + string clientKeyFilename, + bool sslValidation = true, + [CanBeNull] ILoggerFactory loggerFactory = null) { var logger = loggerFactory?.CreateLogger(); if ((!string.IsNullOrEmpty(clientCertFilename) && string.IsNullOrEmpty(clientKeyFilename)) || @@ -99,15 +98,15 @@ public static ChannelBase ControlPlaneConnection(string endP /// /// Open Connection with the control plane with mTLS authentication /// - /// + /// The address and port of control plane /// The pair certificate + key data in a pem format /// Check if the ssl must have a strong validation /// Optional logger factory /// - public static ChannelBase ControlPlaneConnection(string endPoint, - Tuple clientPem = null, - bool sslValidation = true, - [CanBeNull] ILoggerFactory loggerFactory = null) + private static ChannelBase ControlPlaneConnection(string endPoint, + Tuple clientPem = null, + bool sslValidation = true, + [CanBeNull] ILoggerFactory loggerFactory = null) { var uri = new Uri(endPoint); @@ -168,4 +167,59 @@ public static ChannelBase ControlPlaneConnection(string endP #endif return channel; } + + /// + /// Create a connection pool to the control plane with or without SSL and no mTLS + /// + /// The address and port of control plane + /// Optional : Check if the ssl must have a strong validation (default true) + /// Optional : the logger factory to create the logger + /// + public static ChannelPool ControlPlaneConnectionPool(string endPoint, + bool sslValidation = true, + ILoggerFactory loggerFactory = null) + => new(() => ControlPlaneConnection(endPoint, + sslValidation, + loggerFactory), + loggerFactory); + + + /// + /// Create a connection pool to the control plane with mTLS authentication + /// + /// The address and port of control plane + /// The certificate filename in a pem format + /// The client key filename in a pem format + /// Check if the ssl must have a strong validation + /// Optional logger factory + /// + public static ChannelPool ControlPlaneConnectionPool(string endPoint, + string clientCertFilename, + string clientKeyFilename, + bool sslValidation = true, + [CanBeNull] ILoggerFactory loggerFactory = null) + => new(() => ControlPlaneConnection(endPoint, + clientCertFilename, + clientKeyFilename, + sslValidation, + loggerFactory), + loggerFactory); + + /// + /// Create a connection pool to the control plane with mTLS authentication + /// + /// The address and port of control plane + /// The pair certificate + key data in a pem format + /// Check if the ssl must have a strong validation + /// Optional logger factory + /// The connection pool + public static ChannelPool ControlPlaneConnectionPool(string endPoint, + Tuple clientPem = null, + bool sslValidation = true, + [CanBeNull] ILoggerFactory loggerFactory = null) + => new(() => ControlPlaneConnection(endPoint, + clientPem, + sslValidation, + loggerFactory), + loggerFactory); } diff --git a/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/ArmonikDataSynapseClientService.cs b/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/ArmonikDataSynapseClientService.cs index f04e2199..fd588719 100644 --- a/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/ArmonikDataSynapseClientService.cs +++ b/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/ArmonikDataSynapseClientService.cs @@ -1,5 +1,5 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2022. // W. Kirschenmann // J. Gurhem @@ -8,13 +8,13 @@ // F. Lemaitre // S. Djebbar // J. Fonseca -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,8 +27,6 @@ using Google.Protobuf.WellKnownTypes; -using Grpc.Core; - using JetBrains.Annotations; using Microsoft.Extensions.Logging; @@ -66,7 +64,7 @@ public ArmonikDataSynapseClientService(Properties properties, [CanBeNull] private ILogger Logger { get; } - private ChannelBase GrpcChannel { get; set; } + private ChannelPool GrpcPool { get; set; } /// @@ -92,24 +90,24 @@ public SessionService CreateSession(TaskOptions taskOptions = null) Logger?.LogDebug("Creating Session... "); - return new SessionService(GrpcChannel, + return new SessionService(GrpcPool, LoggerFactory, TaskOptions); } private void ControlPlaneConnection() { - if (GrpcChannel != null) + if (GrpcPool != null) { return; } - GrpcChannel = ClientServiceConnector.ControlPlaneConnection(properties_.ConnectionString, - properties_.ClientCertFilePem, - properties_.ClientKeyFilePem, - properties_.ConfSSLValidation, - LoggerFactory); + GrpcPool = ClientServiceConnector.ControlPlaneConnectionPool(properties_.ConnectionString, + properties_.ClientCertFilePem, + properties_.ClientKeyFilePem, + properties_.ConfSSLValidation, + LoggerFactory); } /// @@ -122,7 +120,7 @@ public SessionService OpenSession(string sessionId, { ControlPlaneConnection(); - return new SessionService(GrpcChannel, + return new SessionService(GrpcPool, LoggerFactory, clientOptions ?? SessionService.InitializeDefaultTaskOptions(), new Session diff --git a/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/SessionService.cs b/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/SessionService.cs index 0aa6716c..8a89a7fc 100644 --- a/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/SessionService.cs +++ b/DataSynapseApi/ArmoniK.DevelopmentKit.GridServer.Client/SessionService.cs @@ -1,5 +1,5 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2022. // W. Kirschenmann // J. Gurhem @@ -8,13 +8,13 @@ // F. Lemaitre // S. Djebbar // J. Fonseca -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,8 +34,6 @@ using Google.Protobuf.WellKnownTypes; -using Grpc.Core; - using JetBrains.Annotations; using Microsoft.Extensions.Logging; @@ -53,11 +51,11 @@ public class SessionService : BaseClientSubmitter /// Ctor to instantiate a new SessionService /// This is an object to send task or get Results from a session /// - public SessionService(ChannelBase grpcChannel, + public SessionService(ChannelPool grpcPool, [CanBeNull] ILoggerFactory loggerFactory = null, [CanBeNull] TaskOptions taskOptions = null, [CanBeNull] Session sessionId = null) - : base(grpcChannel, + : base(grpcPool, loggerFactory) { TaskOptions = InitializeDefaultTaskOptions(); @@ -115,7 +113,7 @@ private Session CreateSession(IEnumerable partitionIds) partitionIds, }, }; - var session = SubmitterService.CreateSession(createSessionRequest); + var session = channelPool_.WithChannel(channel => new Submitter.SubmitterClient(channel).CreateSession(createSessionRequest)); return new Session { @@ -157,7 +155,7 @@ public IEnumerable SubmitTasks(IEnumerable payloads) /// public string SubmitTask(byte[] payload) { - Thread.Sleep(2); // Twice the keep alive + Thread.Sleep(2); // Twice the keep alive return SubmitTasks(new[] { payload, diff --git a/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/ArmonikSymphonyClient.cs b/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/ArmonikSymphonyClient.cs index ff1afb48..eda497d0 100644 --- a/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/ArmonikSymphonyClient.cs +++ b/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/ArmonikSymphonyClient.cs @@ -1,5 +1,5 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2022. // W. Kirschenmann // J. Gurhem @@ -8,13 +8,13 @@ // F. Lemaitre // S. Djebbar // J. Fonseca -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,8 +26,6 @@ using ArmoniK.DevelopmentKit.Common; using ArmoniK.DevelopmentKit.SymphonyApi.Client.api; -using Grpc.Core; - using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; @@ -83,7 +81,7 @@ public ArmonikSymphonyClient(IConfiguration configuration, private static string SectionClientCertFile { get; } = "ClientCert"; private static string SectionClientKeyFile { get; } = "ClientKey"; - private ChannelBase GrpcChannel { get; set; } + private ChannelPool GrpcPool { get; set; } private IConfiguration Configuration { get; } @@ -97,7 +95,7 @@ public SessionService CreateSession(TaskOptions taskOptions = null) { ControlPlaneConnection(); - return new SessionService(GrpcChannel, + return new SessionService(GrpcPool, LoggerFactory, taskOptions); } @@ -113,7 +111,7 @@ public SessionService OpenSession(Session sessionId, { ControlPlaneConnection(); - return new SessionService(GrpcChannel, + return new SessionService(GrpcPool, LoggerFactory, clientOptions ?? SessionService.InitializeDefaultTaskOptions(), sessionId); @@ -121,7 +119,7 @@ public SessionService OpenSession(Session sessionId, private void ControlPlaneConnection() { - if (GrpcChannel != null) + if (GrpcPool != null) { return; } @@ -154,10 +152,10 @@ private void ControlPlaneConnection() sslValidation = false; } - GrpcChannel = ClientServiceConnector.ControlPlaneConnection(controlPlanSection_[SectionEndPoint], - clientCertFilename, - clientKeyFilename, - sslValidation, - LoggerFactory); + GrpcPool = ClientServiceConnector.ControlPlaneConnectionPool(controlPlanSection_[SectionEndPoint], + clientCertFilename, + clientKeyFilename, + sslValidation, + LoggerFactory); } } diff --git a/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/SessionService.cs b/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/SessionService.cs index 527a5a51..2e8211d9 100644 --- a/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/SessionService.cs +++ b/SymphonyApi/ArmoniK.DevelopmentKit.SymphonyApi.Client/api/SessionService.cs @@ -1,5 +1,5 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2022. // W. Kirschenmann // J. Gurhem @@ -8,13 +8,13 @@ // F. Lemaitre // S. Djebbar // J. Fonseca -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,8 +33,6 @@ using Google.Protobuf.WellKnownTypes; -using Grpc.Core; - using JetBrains.Annotations; using Microsoft.Extensions.Logging; @@ -52,11 +50,11 @@ public class SessionService : BaseClientSubmitter /// Ctor to instantiate a new SessionService /// This is an object to send task or get Results from a session /// - public SessionService(ChannelBase channel, + public SessionService(ChannelPool channelPool, [CanBeNull] ILoggerFactory loggerFactory = null, [CanBeNull] TaskOptions taskOptions = null, [CanBeNull] Session session = null) - : base(channel, + : base(channelPool, loggerFactory) { TaskOptions = InitializeDefaultTaskOptions(); @@ -110,7 +108,7 @@ private Session CreateSession(IEnumerable partitionIds) partitionIds, }, }; - var session = SubmitterService.CreateSession(createSessionRequest); + var session = channelPool_.WithChannel(channel => new Submitter.SubmitterClient(channel).CreateSession(createSessionRequest)); return new Session { diff --git a/UnifiedApi/Client/Factory/SessionServiceFactory.cs b/UnifiedApi/Client/Factory/SessionServiceFactory.cs index 4734de1f..2442280e 100644 --- a/UnifiedApi/Client/Factory/SessionServiceFactory.cs +++ b/UnifiedApi/Client/Factory/SessionServiceFactory.cs @@ -1,5 +1,5 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2022. // W. Kirschenmann // J. Gurhem @@ -8,13 +8,13 @@ // F. Lemaitre // S. Djebbar // J. Fonseca -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,8 +30,6 @@ using Google.Protobuf.WellKnownTypes; -using Grpc.Core; - using JetBrains.Annotations; using Microsoft.Extensions.Logging; @@ -62,7 +60,7 @@ public SessionServiceFactory([CanBeNull] ILoggerFactory loggerFactory = null) [CanBeNull] private ILogger Logger { get; } - private ChannelBase GrpcChannel { get; set; } + private ChannelPool GrpcPool { get; set; } private ILoggerFactory LoggerFactory { get; } @@ -78,24 +76,24 @@ public SessionService CreateSession(Properties properties) Logger?.LogDebug("Creating Session... "); - return new SessionService(GrpcChannel, + return new SessionService(GrpcPool, LoggerFactory, properties.TaskOptions); } private void ControlPlaneConnection(Properties properties) { - if (GrpcChannel != null) + if (GrpcPool != null) { return; } - GrpcChannel = ClientServiceConnector.ControlPlaneConnection(properties.ConnectionString, - properties.ClientCertFilePem, - properties.ClientKeyFilePem, - properties.ConfSSLValidation, - LoggerFactory); + GrpcPool = ClientServiceConnector.ControlPlaneConnectionPool(properties.ConnectionString, + properties.ClientCertFilePem, + properties.ClientKeyFilePem, + properties.ConfSSLValidation, + LoggerFactory); } /// @@ -110,7 +108,7 @@ public SessionService OpenSession(Properties properties, { ControlPlaneConnection(properties); - return new SessionService(GrpcChannel, + return new SessionService(GrpcPool, LoggerFactory, clientOptions, new Session @@ -154,7 +152,7 @@ public AdminMonitoringService GetAdminMonitoringService(Properties properties) { ControlPlaneConnection(properties); - return new AdminMonitoringService(GrpcChannel, + return new AdminMonitoringService(GrpcPool, LoggerFactory); } } diff --git a/UnifiedApi/Client/Services/Admin/AdminMonitoringService.cs b/UnifiedApi/Client/Services/Admin/AdminMonitoringService.cs index 83645884..5dae7fad 100644 --- a/UnifiedApi/Client/Services/Admin/AdminMonitoringService.cs +++ b/UnifiedApi/Client/Services/Admin/AdminMonitoringService.cs @@ -4,8 +4,7 @@ using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Submitter; - -using Grpc.Core; +using ArmoniK.DevelopmentKit.Client.Common.Submitter; using JetBrains.Annotations; @@ -18,33 +17,30 @@ namespace ArmoniK.DevelopmentKit.Client.Services.Admin; /// public class AdminMonitoringService { + private readonly ChannelPool channelPool_; + /// /// The constructor to instantiate this service /// /// The entry point to the control plane /// The factory logger to create logger - public AdminMonitoringService(ChannelBase channel, + public AdminMonitoringService(ChannelPool channelPool, [CanBeNull] ILoggerFactory loggerFactory = null) { - Logger = loggerFactory?.CreateLogger(); - ControlPlaneService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + Logger = loggerFactory?.CreateLogger(); + channelPool_ = channelPool; } [CanBeNull] private ILogger Logger { get; } - /// - /// The control plane service to request Grpc API - /// - private Api.gRPC.V1.Submitter.Submitter.SubmitterClient ControlPlaneService { get; } - /// /// Returns the service configuration /// public void GetServiceConfiguration() { - var configuration = ControlPlaneService.GetServiceConfiguration(new Empty()); + var configuration = channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).GetServiceConfiguration(new Empty())); Logger?.LogInformation($"This configuration will be update in the nex version [ {configuration} ]"); } @@ -55,10 +51,10 @@ public void GetServiceConfiguration() /// /// the sessionId of the session to cancel public void CancelSession(string sessionId) - => ControlPlaneService.CancelSession(new Session - { - Id = sessionId, - }); + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CancelSession(new Session + { + Id = sessionId, + })); /// /// Return the filtered list of task of a session @@ -66,25 +62,25 @@ public void CancelSession(string sessionId) /// The filter to apply on list of task /// The list of filtered task public IEnumerable ListTasks(TaskFilter taskFilter) - => ControlPlaneService.ListTasks(taskFilter) - .TaskIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListTasks(taskFilter) + .TaskIds); /// /// Return the whole list of task of a session /// /// The list of filtered task public IEnumerable ListAllTasksBySession(string sessionId) - => ControlPlaneService.ListTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - }) - .TaskIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + }) + .TaskIds); /// /// Return the list of task of a session filtered by status @@ -92,24 +88,24 @@ public IEnumerable ListAllTasksBySession(string sessionId) /// The list of filtered task public IEnumerable ListTasksBySession(string sessionId, params TaskStatus[] taskStatus) - => ControlPlaneService.ListTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - taskStatus, - }, - }, - }) - .TaskIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + taskStatus, + }, + }, + }) + .TaskIds); /// /// Return the list of running tasks of a session @@ -118,27 +114,27 @@ public IEnumerable ListTasksBySession(string sessionId, /// The list of filtered task [Obsolete] public IEnumerable ListRunningTasks(string sessionId) - => ControlPlaneService.ListTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - TaskStatus.Creating, - TaskStatus.Dispatched, - TaskStatus.Processing, - TaskStatus.Submitted, - }, - }, - }) - .TaskIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + TaskStatus.Creating, + TaskStatus.Dispatched, + TaskStatus.Processing, + TaskStatus.Submitted, + }, + }, + }) + .TaskIds); /// /// Return the list of error tasks of a session @@ -147,25 +143,25 @@ public IEnumerable ListRunningTasks(string sessionId) /// The list of filtered task [Obsolete] public IEnumerable ListErrorTasks(string sessionId) - => ControlPlaneService.ListTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - TaskStatus.Error, - TaskStatus.Timeout, - }, - }, - }) - .TaskIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + TaskStatus.Error, + TaskStatus.Timeout, + }, + }, + }) + .TaskIds); /// /// Return the list of canceled tasks of a session @@ -174,33 +170,33 @@ public IEnumerable ListErrorTasks(string sessionId) /// The list of filtered task [Obsolete] public IEnumerable ListCanceledTasks(string sessionId) - => ControlPlaneService.ListTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - TaskStatus.Canceled, - TaskStatus.Canceling, - }, - }, - }) - .TaskIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + TaskStatus.Canceled, + TaskStatus.Canceling, + }, + }, + }) + .TaskIds); /// /// Return the list of all sessions /// /// The list of filtered session public IEnumerable ListAllSessions() - => ControlPlaneService.ListSessions(new SessionFilter()) - .SessionIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListSessions(new SessionFilter()) + .SessionIds); /// /// The method is to get a filtered list of session @@ -208,42 +204,42 @@ public IEnumerable ListAllSessions() /// The filter to apply on the request /// returns a list of session filtered public IEnumerable ListSessions(SessionFilter sessionFilter) - => ControlPlaneService.ListSessions(sessionFilter) - .SessionIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListSessions(sessionFilter) + .SessionIds); /// /// The method is to get a filtered list of running session /// /// returns a list of session filtered public IEnumerable ListRunningSessions() - => ControlPlaneService.ListSessions(new SessionFilter - { - Included = new SessionFilter.Types.StatusesRequest - { - Statuses = - { - SessionStatus.Running, - }, - }, - }) - .SessionIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListSessions(new SessionFilter + { + Included = new SessionFilter.Types.StatusesRequest + { + Statuses = + { + SessionStatus.Running, + }, + }, + }) + .SessionIds); /// /// The method is to get a filtered list of running session /// /// returns a list of session filtered public IEnumerable ListCanceledSessions() - => ControlPlaneService.ListSessions(new SessionFilter - { - Included = new SessionFilter.Types.StatusesRequest - { - Statuses = - { - SessionStatus.Canceled, - }, - }, - }) - .SessionIds; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).ListSessions(new SessionFilter + { + Included = new SessionFilter.Types.StatusesRequest + { + Statuses = + { + SessionStatus.Canceled, + }, + }, + }) + .SessionIds); /// /// The method is to get the number of filtered tasks @@ -251,25 +247,25 @@ public IEnumerable ListCanceledSessions() /// the filter to apply on tasks /// return the number of task public int CountTasks(TaskFilter taskFilter) - => ControlPlaneService.CountTasks(taskFilter) - .Values.Count; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CountTasks(taskFilter) + .Values.Count); /// /// The method is to get the number of all task in a session /// /// return the number of task public int CountAllTasksBySession(string sessionId) - => ControlPlaneService.CountTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - }) - .Values.Count; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CountTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + }) + .Values.Count); /// @@ -277,52 +273,52 @@ public int CountAllTasksBySession(string sessionId) /// /// return the number of task public int CountRunningTasksBySession(string sessionId) - => ControlPlaneService.CountTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - TaskStatus.Creating, - TaskStatus.Dispatched, - TaskStatus.Processing, - TaskStatus.Submitted, - }, - }, - }) - .Values.Count; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CountTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + TaskStatus.Creating, + TaskStatus.Dispatched, + TaskStatus.Processing, + TaskStatus.Submitted, + }, + }, + }) + .Values.Count); /// /// The method is to get the number of error tasks in the session /// /// return the number of task public int CountErrorTasksBySession(string sessionId) - => ControlPlaneService.CountTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - TaskStatus.Error, - TaskStatus.Timeout, - }, - }, - }) - .Values.Count; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CountTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + TaskStatus.Error, + TaskStatus.Timeout, + }, + }, + }) + .Values.Count); /// /// Count task in a session and select by status @@ -332,24 +328,24 @@ public int CountErrorTasksBySession(string sessionId) /// return the number of task public int CountTaskBySession(string sessionId, params TaskStatus[] taskStatus) - => ControlPlaneService.CountTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - taskStatus, - }, - }, - }) - .Values.Count; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CountTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + taskStatus, + }, + }, + }) + .Values.Count); /// /// The method is to get the number of error tasks in the session @@ -357,25 +353,25 @@ public int CountTaskBySession(string sessionId, /// return the number of task [Obsolete] public int CountCancelTasksBySession(string sessionId) - => ControlPlaneService.CountTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - TaskStatus.Canceling, - TaskStatus.Canceled, - }, - }, - }) - .Values.Count; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CountTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + TaskStatus.Canceling, + TaskStatus.Canceled, + }, + }, + }) + .Values.Count); /// /// The method is to get the number of error tasks in the session @@ -383,40 +379,40 @@ public int CountCancelTasksBySession(string sessionId) /// return the number of task [Obsolete] public int CountCompletedTasksBySession(string sessionId) - => ControlPlaneService.CountTasks(new TaskFilter - { - Session = new TaskFilter.Types.IdsRequest - { - Ids = - { - sessionId, - }, - }, - Included = new TaskFilter.Types.StatusesRequest - { - Statuses = - { - TaskStatus.Completed, - }, - }, - }) - .Values.Count; + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CountTasks(new TaskFilter + { + Session = new TaskFilter.Types.IdsRequest + { + Ids = + { + sessionId, + }, + }, + Included = new TaskFilter.Types.StatusesRequest + { + Statuses = + { + TaskStatus.Completed, + }, + }, + }) + .Values.Count); /// /// Cancel a list of task in a session /// /// the taskIds list to cancel public void CancelTasksBySession(IEnumerable taskIds) - => ControlPlaneService.CancelTasks(new TaskFilter - { - Task = new TaskFilter.Types.IdsRequest - { - Ids = - { - taskIds, - }, - }, - }); + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CancelTasks(new TaskFilter + { + Task = new TaskFilter.Types.IdsRequest + { + Ids = + { + taskIds, + }, + }, + })); /// /// The method to get status of a list of tasks @@ -424,15 +420,15 @@ public void CancelTasksBySession(IEnumerable taskIds) /// The list of task /// returns a list of pair TaskId/TaskStatus public IEnumerable> GetTaskStatus(IEnumerable taskIds) - => ControlPlaneService.GetTaskStatus(new GetTaskStatusRequest - { - TaskIds = - { - taskIds, - }, - }) - .IdStatuses.Select(idsStatus => Tuple.Create(idsStatus.TaskId, - idsStatus.Status)); + => channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).GetTaskStatus(new GetTaskStatusRequest + { + TaskIds = + { + taskIds, + }, + }) + .IdStatuses.Select(idsStatus => Tuple.Create(idsStatus.TaskId, + idsStatus.Status))); private void UploadResources(string path) diff --git a/UnifiedApi/Client/Services/SessionService.cs b/UnifiedApi/Client/Services/SessionService.cs index 1a6c946c..3ade0501 100644 --- a/UnifiedApi/Client/Services/SessionService.cs +++ b/UnifiedApi/Client/Services/SessionService.cs @@ -1,5 +1,5 @@ // This file is part of the ArmoniK project -// +// // Copyright (C) ANEO, 2021-2022. // W. Kirschenmann // J. Gurhem @@ -8,13 +8,13 @@ // F. Lemaitre // S. Djebbar // J. Fonseca -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,8 +34,6 @@ using Google.Protobuf.WellKnownTypes; -using Grpc.Core; - using JetBrains.Annotations; using Microsoft.Extensions.Logging; @@ -53,11 +51,11 @@ public class SessionService : BaseClientSubmitter /// Ctor to instantiate a new SessionService /// This is an object to send task or get Results from a session /// - public SessionService(ChannelBase channel, + public SessionService(ChannelPool channelPool, [CanBeNull] ILoggerFactory loggerFactory = null, [CanBeNull] TaskOptions taskOptions = null, [CanBeNull] Session session = null) - : base(channel, + : base(channelPool, loggerFactory) { TaskOptions = InitializeDefaultTaskOptions(); @@ -123,7 +121,7 @@ private Session CreateSession(IEnumerable partitionIds) partitionIds, }, }; - var session = SubmitterService.CreateSession(createSessionRequest); + var session = channelPool_.WithChannel(channel => new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel).CreateSession(createSessionRequest)); return new Session { @@ -167,7 +165,7 @@ public IEnumerable SubmitTasks(IEnumerable payloads) public string SubmitTask(byte[] payload, int waitTimeBeforeNextSubmit = 2) { - Thread.Sleep(waitTimeBeforeNextSubmit); // Twice the keep alive + Thread.Sleep(waitTimeBeforeNextSubmit); // Twice the keep alive return SubmitTasks(new[] { payload,