Skip to content

Commit

Permalink
refactor: remove gRPC Output type from ISubmitter and IWorkerStreamHa…
Browse files Browse the repository at this point in the history
…ndler
  • Loading branch information
aneojgurhem committed Sep 28, 2023
1 parent 7c98a6e commit 0173287
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 203 deletions.
64 changes: 17 additions & 47 deletions Common/src/Pollster/TaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Worker;
using ArmoniK.Core.Base;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.gRPC.Services;
Expand All @@ -36,7 +35,7 @@

using Microsoft.Extensions.Logging;

using Output = ArmoniK.Api.gRPC.V1.Output;
using Output = ArmoniK.Core.Common.Storage.Output;
using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus;

namespace ArmoniK.Core.Common.Pollster;
Expand Down Expand Up @@ -64,7 +63,7 @@ public sealed class TaskHandler : IAsyncDisposable
private readonly CancellationTokenSource workerConnectionCts_;
private readonly IWorkerStreamHandler workerStreamHandler_;
private IAgent? agent_;
private ProcessReply? reply_;
private Output? output_;
private SessionData? sessionData_;
private TaskData? taskData_;

Expand Down Expand Up @@ -380,13 +379,8 @@ await taskTable_.ReleaseTask(taskData_,
taskData_.TaskId);
await submitter_.CompleteTaskAsync(taskData_,
true,
new Output
{
Error = new Output.Types.Error
{
Details = "Other pod seems to have crashed, resubmitting task",
},
},
new Output(false,
"Other pod seems to have crashed, resubmitting task"),
CancellationToken.None)
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -557,30 +551,11 @@ await HandleErrorRequeueAsync(e,
{
// at this point worker requests should have ended
logger_.LogDebug("Wait for task output");
reply_ = await workerStreamHandler_.StartTaskProcessing(new ProcessRequest
{
CommunicationToken = token_,
Configuration = new Configuration
{
DataChunkMaxSize = PayloadConfiguration.MaxChunkSize,
},
DataDependencies =
{
taskData_.DataDependencies,
},
DataFolder = folder_,
ExpectedOutputKeys =
{
taskData_.ExpectedOutputIds,
},
PayloadId = taskData_.PayloadId,
SessionId = taskData_.SessionId,
TaskId = taskData_.TaskId,
TaskOptions = taskData_.Options.ToGrpcTaskOptions(),
},
taskData_.Options.MaxDuration,
workerConnectionCts_.Token)
.ConfigureAwait(false);
output_ = await workerStreamHandler_.StartTaskProcessing(taskData_,
token_,
folder_,
workerConnectionCts_.Token)
.ConfigureAwait(false);

logger_.LogDebug("Stop agent server");
await agentHandler_.Stop(workerConnectionCts_.Token)
Expand Down Expand Up @@ -614,9 +589,9 @@ public async Task PostProcessing()
throw new NullReferenceException(nameof(agent_) + " is null.");
}

if (reply_ is null)
if (output_ is null)
{
throw new NullReferenceException(nameof(reply_) + " is null.");
throw new NullReferenceException(nameof(output_) + " is null.");
}

using var _ = logger_.BeginNamedScope("PostProcessing",
Expand All @@ -626,10 +601,10 @@ public async Task PostProcessing()

try
{
logger_.LogInformation("Process task output of type {type}",
reply_.Output.TypeCase);
logger_.LogInformation("Process task output is {type}",
output_.Success);

if (reply_.Output.TypeCase is Output.TypeOneofCase.Ok)
if (output_.Success)
{
logger_.LogDebug("Complete processing of the request");
await agent_.FinalizeTaskCreation(CancellationToken.None)
Expand All @@ -638,7 +613,7 @@ await agent_.FinalizeTaskCreation(CancellationToken.None)

await submitter_.CompleteTaskAsync(taskData_,
false,
reply_.Output,
output_,
CancellationToken.None)
.ConfigureAwait(false);
messageHandler_.Status = QueueMessageStatus.Processed;
Expand Down Expand Up @@ -726,13 +701,8 @@ await taskTable_.ReleaseTask(taskData,

await submitter_.CompleteTaskAsync(taskData,
resubmit,
new Output
{
Error = new Output.Types.Error
{
Details = e.Message,
},
},
new Output(false,
e.Message),
CancellationToken.None)
.ConfigureAwait(false);

Expand Down
9 changes: 5 additions & 4 deletions Common/src/Stream/Worker/IWorkerStreamHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.gRPC.V1.Worker;
using ArmoniK.Core.Base;
using ArmoniK.Core.Common.Storage;

using JetBrains.Annotations;

Expand All @@ -29,7 +29,8 @@ namespace ArmoniK.Core.Common.Stream.Worker;
[PublicAPI]
public interface IWorkerStreamHandler : IInitializable, IDisposable
{
public Task<ProcessReply> StartTaskProcessing(ProcessRequest request,
TimeSpan duration,
CancellationToken cancellationToken);
public Task<Output> StartTaskProcessing(TaskData taskData,
string token,
string dataFolder,
CancellationToken cancellationToken);
}
38 changes: 31 additions & 7 deletions Common/src/Stream/Worker/WorkerStreamHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.Injection.Options;
using ArmoniK.Core.Common.Storage;

using Grpc.Core;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;

using Output = ArmoniK.Core.Common.Storage.Output;

namespace ArmoniK.Core.Common.Stream.Worker;

public class WorkerStreamHandler : IWorkerStreamHandler
Expand Down Expand Up @@ -129,19 +132,40 @@ public async Task<HealthCheckResult> Check(HealthCheckTag tag)
public void Dispose()
=> GC.SuppressFinalize(this);

public async Task<ProcessReply> StartTaskProcessing(ProcessRequest request,
TimeSpan duration,
CancellationToken cancellationToken)
public async Task<Output> StartTaskProcessing(TaskData taskData,
string token,
string dataFolder,
CancellationToken cancellationToken)
{
if (workerClient_ == null)
{
throw new ArmoniKException("Worker client should be initialized");
}

return await workerClient_.ProcessAsync(request,
deadline: DateTime.UtcNow + duration,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
return (await workerClient_.ProcessAsync(new ProcessRequest
{
CommunicationToken = token,
Configuration = new Configuration
{
DataChunkMaxSize = PayloadConfiguration.MaxChunkSize,
},
DataDependencies =
{
taskData.DataDependencies,
},
DataFolder = dataFolder,
ExpectedOutputKeys =
{
taskData.ExpectedOutputIds,
},
PayloadId = taskData.PayloadId,
SessionId = taskData.SessionId,
TaskId = taskData.TaskId,
TaskOptions = taskData.Options.ToGrpcTaskOptions(),
},
deadline: DateTime.UtcNow + taskData.Options.MaxDuration,
cancellationToken: cancellationToken)
.ConfigureAwait(false)).Output;
}

private Task<bool> CheckWorker(CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion Common/src/gRPC/Services/ISubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

using Grpc.Core;

using Output = ArmoniK.Api.gRPC.V1.Output;
using Output = ArmoniK.Core.Common.Storage.Output;
using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions;

namespace ArmoniK.Core.Common.gRPC.Services;
Expand Down
Loading

0 comments on commit 0173287

Please sign in to comment.