Skip to content

Commit

Permalink
Deactivate legacy retry policies
Browse files Browse the repository at this point in the history
  • Loading branch information
wkirschenmann committed Aug 25, 2023
1 parent d8c22b5 commit 685d2e9
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 254 deletions.
310 changes: 89 additions & 221 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,7 +25,6 @@
using ArmoniK.Api.gRPC.V1;
using ArmoniK.DevelopmentKit.Client.Common.Status;
using ArmoniK.DevelopmentKit.Client.Common.Submitter.ApiExt;
using ArmoniK.DevelopmentKit.Common;
using ArmoniK.DevelopmentKit.Common.Exceptions;
using ArmoniK.Utils;

Expand Down Expand Up @@ -109,13 +107,6 @@ protected BaseClientSubmitter(Properties properties,
/// </summary>
public Session SessionId { get; }

/// <summary>
/// The channel pool to use for creating clients
/// </summary>
public ChannelPool ChannelPool
=> channelPool_ ??= ClientServiceConnector.ControlPlaneConnectionPool(properties_,
LoggerFactory);

/// <summary>
/// The logger to call the generate log in Seq
/// </summary>
Expand Down Expand Up @@ -242,7 +233,6 @@ public IEnumerable<string> SubmitTasksWithDependencies(IEnumerable<Tuple<byte[],
/// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker
/// </param>
/// <returns>return the ids of the created tasks</returns>
[PublicAPI]
private IEnumerable<string> SubmitTaskChunkWithDependencies(IEnumerable<Tuple<string, byte[], IList<string>>> payloadsWithDependencies,
int maxRetries,
TaskOptions? taskOptions = null)
Expand All @@ -257,62 +247,14 @@ private IEnumerable<string> SubmitTaskChunkWithDependencies(IEnumerable<Tuple<st
tuple.Item1,
}));

for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
{
try
{
return ArmoniKClient.SubmitTasksAsync(SessionId.Id,
// ReSharper disable once PossibleMultipleEnumeration
// Only occurs in case of retry
taskDefinitions,
taskOptions ?? TaskOptions,
5,
cancellationToken: CancellationToken.None)
.Result.Select(info => info.TaskId);
}
catch (Exception e)
{
if (nbRetry >= maxRetries - 1)
{
throw;
}

switch (e)
{
case AggregateException
{
InnerException: RpcException,
} ex:
Logger.LogWarning(ex.InnerException,
"Failure to submit");
break;
case AggregateException
{
InnerException: IOException,
} ex:
Logger.LogWarning(ex.InnerException,
"IOException : Failure to submit, Retrying");
break;
case IOException ex:
Logger.LogWarning(ex,
"IOException Failure to submit");
break;
default:
Logger.LogError(e,
"Unknown failure :");
throw;
}
}

if (nbRetry > 0)
{
Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit batch of task",
nbRetry,
maxRetries);
}
}

throw new Exception("Max retry to send has been reached");
return ArmoniKClient.SubmitTasksAsync(SessionId.Id,
// ReSharper disable once PossibleMultipleEnumeration
// Only occurs in case of retry
taskDefinitions,
taskOptions ?? TaskOptions,
maxRetries,
cancellationToken: CancellationToken.None)
.Result.Select(info => info.TaskId);
}

/// <summary>
Expand Down Expand Up @@ -352,29 +294,15 @@ public void WaitForTasksCompletion(IEnumerable<string> taskIds,
{
using var _ = Logger.LogFunction();

// TODO: use RetryArmoniKClient instead of this code.
Retry.WhileException(maxRetries,
delayMs,
retry =>
{
if (retry > 1)
{
Logger.LogWarning("Try {try} for {funcName}",
retry,
nameof(ArmoniKClient.WaitForCompletionAsync));
}

var __ = ArmoniKClient.WaitForCompletionAsync(SessionId.Id,
taskIds.ToList(),
true,
true,
5,
cancellationToken: CancellationToken.None)
.Result;
},
true,
typeof(IOException),
typeof(RpcException));
ArmoniKClient.WaitForCompletionAsync(SessionId.Id,
taskIds.ToList(),
true,
true,
maxRetries,
maxRetries * delayMs / Math.Pow(2,
maxRetries),
CancellationToken.None)
.Wait();
}

/// <summary>
Expand All @@ -400,32 +328,22 @@ public ResultStatusCollection GetResultStatus(IEnumerable<string> taskIds,
: Array.Empty<ResultStatusData>();

// TODO: use RetryArmoniKClient instead of this code.
var idStatuses = Retry.WhileException(5,
2000,
retry =>
{
Logger.LogDebug("Try {try} for {funcName}",
retry,
nameof(ArmoniKClient.GetResultStatusAsync));
return ArmoniKClient.GetResultStatusAsync(SessionId.Id,
result2TaskDic.Keys,
5,
cancellationToken: cancellationToken)
.Result;
},
true,
typeof(IOException),
typeof(RpcException))
.Where(status => status.TaskStatus != ArmoniKResultStatus.Unknown)
.ToLookup(idStatus => idStatus.TaskStatus,
idStatus =>
{
var taskId = result2TaskDic[idStatus.ResultId];
result2TaskDic.Remove(idStatus.ResultId);
return new ResultStatusData(idStatus.ResultId,
taskId,
idStatus.TaskStatus);
});
var idStatuses = ArmoniKClient.GetResultStatusAsync(SessionId.Id,
result2TaskDic.Keys,
5,
5 * 2000 / Math.Pow(2,
5),
cancellationToken)
.Result.Where(status => status.TaskStatus != ArmoniKResultStatus.Unknown)
.ToLookup(idStatus => idStatus.TaskStatus,
idStatus =>
{
var taskId = result2TaskDic[idStatus.ResultId];
result2TaskDic.Remove(idStatus.ResultId);
return new ResultStatusData(idStatus.ResultId,
taskId,
idStatus.TaskStatus);
});


var resultStatusList = new ResultStatusCollection(idStatuses[ArmoniKResultStatus.Available]
Expand All @@ -446,25 +364,12 @@ public ResultStatusCollection GetResultStatus(IEnumerable<string> taskIds,
/// <param name="taskIds">The list of task ids.</param>
/// <returns>A collection of map task results.</returns>
public IEnumerable<TaskOutputIds> GetResultIds(IEnumerable<string> taskIds)
=> Retry.WhileException(5,
2000,
retry =>
{
if (retry > 1)
{
Logger.LogWarning("Try {try} for {funcName}",
retry,
nameof(GetResultIds));
}

return ArmoniKClient.GetResultIdsAsync(taskIds.ToList(),
5,
cancellationToken: CancellationToken.None)
.Result;
},
true,
typeof(IOException),
typeof(RpcException));
=> ArmoniKClient.GetResultIdsAsync(taskIds.ToList(),
5,
5 * 2000 / Math.Pow(2,
5),
CancellationToken.None)
.Result;


/// <summary>
Expand All @@ -487,28 +392,21 @@ public byte[] GetResult(string taskId,
.Single()
.OutputIds.Single();

Retry.WhileException(5,
2000,
retry =>
{
ArmoniKClient.WaitForAvailability(SessionId.Id,
resultId,
cancellationToken: cancellationToken)
.Wait(cancellationToken);
},
true,
typeof(IOException),
typeof(RpcException));

return Retry.WhileException(5,
200,
_ => ArmoniKClient.DownloadResultAsync(SessionId.Id,
resultId,
cancellationToken: cancellationToken)
.Result,
true,
typeof(IOException),
typeof(RpcException))!;
ArmoniKClient.WaitForAvailability(SessionId.Id,
resultId,
5,
5 * 2000 / Math.Pow(2,
5),
cancellationToken)
.Wait(cancellationToken);

return ArmoniKClient.DownloadResultAsync(SessionId.Id,
resultId,
5,
5 * 2000 / Math.Pow(2,
5),
cancellationToken)
.Result!;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -548,12 +446,36 @@ public IEnumerable<Tuple<string, byte[]>> GetResults(IEnumerable<string> taskIds
/// <exception cref="Exception"></exception>
/// <exception cref="ArgumentOutOfRangeException"></exception>
// TODO: return a compound type to avoid having a nullable that holds the information and return an empty array.
[Obsolete]
public async Task<byte[]?> TryGetResultAsync(ResultRequest resultRequest,
CancellationToken cancellationToken = default)
=> await ArmoniKClient.DownloadResultAsync(resultRequest.Session,
resultRequest.ResultId,
cancellationToken: cancellationToken);
{
try
{
return await ArmoniKClient.DownloadResultAsync(resultRequest.Session,
resultRequest.ResultId,
5,
5 * 2000 / Math.Pow(2,
5),
cancellationToken);
}
catch (RpcException e)
{
if (e.StatusCode == StatusCode.NotFound)
{
return null;
}

throw;
}
catch (AggregateException ae)
{
ae.Handle(exception => exception is RpcException
{
StatusCode: StatusCode.NotFound or StatusCode.Aborted or StatusCode.Cancelled,
} or KeyNotFoundException);
return null;
}
}

/// <summary>
/// Try to find the result of One task. If there no result, the function return byte[0]
Expand Down Expand Up @@ -591,67 +513,13 @@ public IEnumerable<Tuple<string, byte[]>> GetResults(IEnumerable<string> taskIds
.Single()
.OutputIds.Single();

var resultReply = Retry.WhileException(5,
2000,
retry =>
{
if (retry > 1)
{
Logger.LogWarning("Try {try} for {funcName}",
retry,
"SubmitterService.TryGetResultAsync");
}

try
{
var response = ArmoniKClient.DownloadResultAsync(SessionId.Id,
resultId,
cancellationToken: cancellationToken)
.Result;
return response;
}
catch (AggregateException ex)
{
if (ex.InnerException == null)
{
throw;
}

var rpcException = ex.InnerException;

switch (rpcException)
{
//Not yet available return from the tryGetResult
case RpcException
{
StatusCode: StatusCode.NotFound,
}:
return null;

//We lost the communication rethrow to retry :
case RpcException
{
StatusCode: StatusCode.Unavailable,
}:
throw;

case RpcException
{
StatusCode: StatusCode.Aborted or StatusCode.Cancelled,
}:

Logger.LogError(rpcException,
"Error while trying to get a result: {error}",
rpcException.Message);
return null;
default:
throw;
}
}
},
true,
typeof(IOException),
typeof(RpcException));
var resultReply = TryGetResultAsync(new ResultRequest
{
ResultId = resultId,
Session = SessionId.Id,
},
cancellationToken)
.Result;

return resultReply;
}
Expand Down
Loading

0 comments on commit 685d2e9

Please sign in to comment.