From 685d2e9b4b35bef149c903801e0315ee52892e33 Mon Sep 17 00:00:00 2001
From: Wilfried Kirschenmann <17836122+wkirschenmann@users.noreply.github.com>
Date: Fri, 25 Aug 2023 06:58:07 +0200
Subject: [PATCH] Deactivate legacy retry policies
---
.../Common/Submitter/BaseClientSubmitter.cs | 310 +++++-------------
.../src/Unified/Services/Submitter/Service.cs | 47 +--
2 files changed, 103 insertions(+), 254 deletions(-)
diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs
index 01e76e78..b7d62aac 100644
--- a/Client/src/Common/Submitter/BaseClientSubmitter.cs
+++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs
@@ -17,7 +17,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
-using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -26,7 +25,6 @@
using ArmoniK.Api.gRPC.V1;
using ArmoniK.DevelopmentKit.Client.Common.Status;
using ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt;
-using ArmoniK.DevelopmentKit.Common;
using ArmoniK.DevelopmentKit.Common.Exceptions;
using ArmoniK.Utils;
@@ -109,13 +107,6 @@ protected BaseClientSubmitter(Properties properties,
///
public Session SessionId { get; }
- ///
- /// The channel pool to use for creating clients
- ///
- public ChannelPool ChannelPool
- => channelPool_ ??= ClientServiceConnector.ControlPlaneConnectionPool(properties_,
- LoggerFactory);
-
///
/// The logger to call the generate log in Seq
///
@@ -242,7 +233,6 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable
/// return the ids of the created tasks
- [PublicAPI]
private IEnumerable SubmitTaskChunkWithDependencies(IEnumerable>> payloadsWithDependencies,
int maxRetries,
TaskOptions? taskOptions = null)
@@ -257,62 +247,14 @@ private IEnumerable SubmitTaskChunkWithDependencies(IEnumerable info.TaskId);
- }
- catch (Exception e)
- {
- if (nbRetry >= maxRetries - 1)
- {
- throw;
- }
-
- switch (e)
- {
- case AggregateException
- {
- InnerException: RpcException,
- } ex:
- Logger.LogWarning(ex.InnerException,
- "Failure to submit");
- break;
- case AggregateException
- {
- InnerException: IOException,
- } ex:
- Logger.LogWarning(ex.InnerException,
- "IOException : Failure to submit, Retrying");
- break;
- case IOException ex:
- Logger.LogWarning(ex,
- "IOException Failure to submit");
- break;
- default:
- Logger.LogError(e,
- "Unknown failure :");
- throw;
- }
- }
-
- if (nbRetry > 0)
- {
- Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task",
- nbRetry,
- maxRetries);
- }
- }
-
- throw new Exception("Max retry to send has been reached");
+ return ArmoniKClient.SubmitTasksAsync(SessionId.Id,
+ // ReSharper disable once PossibleMultipleEnumeration
+ // Only occurs in case of retry
+ taskDefinitions,
+ taskOptions ?? TaskOptions,
+ maxRetries,
+ cancellationToken: CancellationToken.None)
+ .Result.Select(info => info.TaskId);
}
///
@@ -352,29 +294,15 @@ public void WaitForTasksCompletion(IEnumerable taskIds,
{
using var _ = Logger.LogFunction();
- // TODO: use RetryArmoniKClient instead of this code.
- Retry.WhileException(maxRetries,
- delayMs,
- retry =>
- {
- if (retry > 1)
- {
- Logger.LogWarning("Try {try} for {funcName}",
- retry,
- nameof(ArmoniKClient.WaitForCompletionAsync));
- }
-
- var __ = ArmoniKClient.WaitForCompletionAsync(SessionId.Id,
- taskIds.ToList(),
- true,
- true,
- 5,
- cancellationToken: CancellationToken.None)
- .Result;
- },
- true,
- typeof(IOException),
- typeof(RpcException));
+ ArmoniKClient.WaitForCompletionAsync(SessionId.Id,
+ taskIds.ToList(),
+ true,
+ true,
+ maxRetries,
+ maxRetries * delayMs / Math.Pow(2,
+ maxRetries),
+ CancellationToken.None)
+ .Wait();
}
///
@@ -400,32 +328,22 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds,
: Array.Empty();
// TODO: use RetryArmoniKClient instead of this code.
- var idStatuses = Retry.WhileException(5,
- 2000,
- retry =>
- {
- Logger.LogDebug("Try {try} for {funcName}",
- retry,
- nameof(ArmoniKClient.GetResultStatusAsync));
- return ArmoniKClient.GetResultStatusAsync(SessionId.Id,
- result2TaskDic.Keys,
- 5,
- cancellationToken: cancellationToken)
- .Result;
- },
- true,
- typeof(IOException),
- typeof(RpcException))
- .Where(status => status.TaskStatus != ArmoniKResultStatus.Unknown)
- .ToLookup(idStatus => idStatus.TaskStatus,
- idStatus =>
- {
- var taskId = result2TaskDic[idStatus.ResultId];
- result2TaskDic.Remove(idStatus.ResultId);
- return new ResultStatusData(idStatus.ResultId,
- taskId,
- idStatus.TaskStatus);
- });
+ var idStatuses = ArmoniKClient.GetResultStatusAsync(SessionId.Id,
+ result2TaskDic.Keys,
+ 5,
+ 5 * 2000 / Math.Pow(2,
+ 5),
+ cancellationToken)
+ .Result.Where(status => status.TaskStatus != ArmoniKResultStatus.Unknown)
+ .ToLookup(idStatus => idStatus.TaskStatus,
+ idStatus =>
+ {
+ var taskId = result2TaskDic[idStatus.ResultId];
+ result2TaskDic.Remove(idStatus.ResultId);
+ return new ResultStatusData(idStatus.ResultId,
+ taskId,
+ idStatus.TaskStatus);
+ });
var resultStatusList = new ResultStatusCollection(idStatuses[ArmoniKResultStatus.Available]
@@ -446,25 +364,12 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds,
/// The list of task ids.
/// A collection of map task results.
public IEnumerable GetResultIds(IEnumerable taskIds)
- => Retry.WhileException(5,
- 2000,
- retry =>
- {
- if (retry > 1)
- {
- Logger.LogWarning("Try {try} for {funcName}",
- retry,
- nameof(GetResultIds));
- }
-
- return ArmoniKClient.GetResultIdsAsync(taskIds.ToList(),
- 5,
- cancellationToken: CancellationToken.None)
- .Result;
- },
- true,
- typeof(IOException),
- typeof(RpcException));
+ => ArmoniKClient.GetResultIdsAsync(taskIds.ToList(),
+ 5,
+ 5 * 2000 / Math.Pow(2,
+ 5),
+ CancellationToken.None)
+ .Result;
///
@@ -487,28 +392,21 @@ public byte[] GetResult(string taskId,
.Single()
.OutputIds.Single();
- Retry.WhileException(5,
- 2000,
- retry =>
- {
- ArmoniKClient.WaitForAvailability(SessionId.Id,
- resultId,
- cancellationToken: cancellationToken)
- .Wait(cancellationToken);
- },
- true,
- typeof(IOException),
- typeof(RpcException));
-
- return Retry.WhileException(5,
- 200,
- _ => ArmoniKClient.DownloadResultAsync(SessionId.Id,
- resultId,
- cancellationToken: cancellationToken)
- .Result,
- true,
- typeof(IOException),
- typeof(RpcException))!;
+ ArmoniKClient.WaitForAvailability(SessionId.Id,
+ resultId,
+ 5,
+ 5 * 2000 / Math.Pow(2,
+ 5),
+ cancellationToken)
+ .Wait(cancellationToken);
+
+ return ArmoniKClient.DownloadResultAsync(SessionId.Id,
+ resultId,
+ 5,
+ 5 * 2000 / Math.Pow(2,
+ 5),
+ cancellationToken)
+ .Result!;
}
catch (Exception ex)
{
@@ -548,12 +446,36 @@ public IEnumerable> GetResults(IEnumerable taskIds
///
///
// TODO: return a compound type to avoid having a nullable that holds the information and return an empty array.
- [Obsolete]
public async Task TryGetResultAsync(ResultRequest resultRequest,
CancellationToken cancellationToken = default)
- => await ArmoniKClient.DownloadResultAsync(resultRequest.Session,
- resultRequest.ResultId,
- cancellationToken: cancellationToken);
+ {
+ try
+ {
+ return await ArmoniKClient.DownloadResultAsync(resultRequest.Session,
+ resultRequest.ResultId,
+ 5,
+ 5 * 2000 / Math.Pow(2,
+ 5),
+ cancellationToken);
+ }
+ catch (RpcException e)
+ {
+ if (e.StatusCode == StatusCode.NotFound)
+ {
+ return null;
+ }
+
+ throw;
+ }
+ catch (AggregateException ae)
+ {
+ ae.Handle(exception => exception is RpcException
+ {
+ StatusCode: StatusCode.NotFound or StatusCode.Aborted or StatusCode.Cancelled,
+ } or KeyNotFoundException);
+ return null;
+ }
+ }
///
/// Try to find the result of One task. If there no result, the function return byte[0]
@@ -591,67 +513,13 @@ public IEnumerable> GetResults(IEnumerable taskIds
.Single()
.OutputIds.Single();
- var resultReply = Retry.WhileException(5,
- 2000,
- retry =>
- {
- if (retry > 1)
- {
- Logger.LogWarning("Try {try} for {funcName}",
- retry,
- "SubmitterService.TryGetResultAsync");
- }
-
- try
- {
- var response = ArmoniKClient.DownloadResultAsync(SessionId.Id,
- resultId,
- cancellationToken: cancellationToken)
- .Result;
- return response;
- }
- catch (AggregateException ex)
- {
- if (ex.InnerException == null)
- {
- throw;
- }
-
- var rpcException = ex.InnerException;
-
- switch (rpcException)
- {
- //Not yet available return from the tryGetResult
- case RpcException
- {
- StatusCode: StatusCode.NotFound,
- }:
- return null;
-
- //We lost the communication rethrow to retry :
- case RpcException
- {
- StatusCode: StatusCode.Unavailable,
- }:
- throw;
-
- case RpcException
- {
- StatusCode: StatusCode.Aborted or StatusCode.Cancelled,
- }:
-
- Logger.LogError(rpcException,
- "Error while trying to get a result: {error}",
- rpcException.Message);
- return null;
- default:
- throw;
- }
- }
- },
- true,
- typeof(IOException),
- typeof(RpcException));
+ var resultReply = TryGetResultAsync(new ResultRequest
+ {
+ ResultId = resultId,
+ Session = SessionId.Id,
+ },
+ cancellationToken)
+ .Result;
return resultReply;
}
diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs
index 9d505460..037fb249 100644
--- a/Client/src/Unified/Services/Submitter/Service.cs
+++ b/Client/src/Unified/Services/Submitter/Service.cs
@@ -16,7 +16,6 @@
using System;
using System.Collections.Generic;
-using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -34,8 +33,6 @@
using ArmoniK.DevelopmentKit.Common.Exceptions;
using ArmoniK.Utils;
-using Grpc.Core;
-
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
@@ -563,28 +560,13 @@ private void ProxyTryGetResults(IEnumerable ta
Logger.LogTrace("Response handler for {taskId}",
resultStatusData.TaskId);
responseHandler(resultStatusData.TaskId,
- Retry.WhileException(5,
- 2000,
- retry =>
- {
- if (retry > 1)
- {
- Logger.LogWarning("Try {try} for {funcName}",
- retry,
- nameof(SessionService.TryGetResultAsync));
- }
-
- return SessionService.TryGetResultAsync(new ResultRequest
- {
- ResultId = resultStatusData.ResultId,
- Session = SessionId,
- },
- CancellationToken.None)
- .Result;
- },
- true,
- typeof(IOException),
- typeof(RpcException))!);
+ SessionService.TryGetResultAsync(new ResultRequest
+ {
+ ResultId = resultStatusData.ResultId,
+ Session = SessionId,
+ },
+ CancellationToken.None)
+ .Result!);
}
catch (Exception e)
{
@@ -614,15 +596,14 @@ private void ProxyTryGetResults(IEnumerable ta
var taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId);
- switch (taskStatus)
+ if (taskStatus == ArmonikTaskStatusCode.TaskCancelled)
+ {
+ details = $"Task {resultStatusData.TaskId} was canceled";
+ }
+ else
{
- case ArmonikTaskStatusCode.TaskCancelled:
- details = $"Task {resultStatusData.TaskId} was canceled";
- break;
- default:
- var outputInfo = SessionService.TryGetTaskError(resultStatusData.TaskId);
- details = outputInfo ?? "Result is in status : " + resultStatusData.Status + ", look for task in error in logs.";
- break;
+ var outputInfo = SessionService.TryGetTaskError(resultStatusData.TaskId);
+ details = outputInfo ?? "Result is in status : " + resultStatusData.Status + ", look for task in error in logs.";
}
Logger.LogDebug("Error handler for {taskId}, {taskStatus}: {details}",