Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove gRPC Output type from ISubmitter and IWorkerStreamHandler #510

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 17 additions & 47 deletions Common/src/Pollster/TaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Worker;
using ArmoniK.Core.Base;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.gRPC.Services;
Expand All @@ -36,7 +35,7 @@

using Microsoft.Extensions.Logging;

using Output = ArmoniK.Api.gRPC.V1.Output;
using Output = ArmoniK.Core.Common.Storage.Output;
using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus;

namespace ArmoniK.Core.Common.Pollster;
Expand Down Expand Up @@ -64,7 +63,7 @@ public sealed class TaskHandler : IAsyncDisposable
private readonly CancellationTokenSource workerConnectionCts_;
private readonly IWorkerStreamHandler workerStreamHandler_;
private IAgent? agent_;
private ProcessReply? reply_;
private Output? output_;
private SessionData? sessionData_;
private TaskData? taskData_;

Expand Down Expand Up @@ -380,13 +379,8 @@ await taskTable_.ReleaseTask(taskData_,
taskData_.TaskId);
await submitter_.CompleteTaskAsync(taskData_,
true,
new Output
{
Error = new Output.Types.Error
{
Details = "Other pod seems to have crashed, resubmitting task",
},
},
new Output(false,
"Other pod seems to have crashed, resubmitting task"),
CancellationToken.None)
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -557,30 +551,11 @@ await HandleErrorRequeueAsync(e,
{
// at this point worker requests should have ended
logger_.LogDebug("Wait for task output");
reply_ = await workerStreamHandler_.StartTaskProcessing(new ProcessRequest
{
CommunicationToken = token_,
Configuration = new Configuration
{
DataChunkMaxSize = PayloadConfiguration.MaxChunkSize,
},
DataDependencies =
{
taskData_.DataDependencies,
},
DataFolder = folder_,
ExpectedOutputKeys =
{
taskData_.ExpectedOutputIds,
},
PayloadId = taskData_.PayloadId,
SessionId = taskData_.SessionId,
TaskId = taskData_.TaskId,
TaskOptions = taskData_.Options.ToGrpcTaskOptions(),
},
taskData_.Options.MaxDuration,
workerConnectionCts_.Token)
.ConfigureAwait(false);
output_ = await workerStreamHandler_.StartTaskProcessing(taskData_,
token_,
folder_,
workerConnectionCts_.Token)
.ConfigureAwait(false);

logger_.LogDebug("Stop agent server");
await agentHandler_.Stop(workerConnectionCts_.Token)
Expand Down Expand Up @@ -614,9 +589,9 @@ public async Task PostProcessing()
throw new NullReferenceException(nameof(agent_) + " is null.");
}

if (reply_ is null)
if (output_ is null)
{
throw new NullReferenceException(nameof(reply_) + " is null.");
throw new NullReferenceException(nameof(output_) + " is null.");
}

using var _ = logger_.BeginNamedScope("PostProcessing",
Expand All @@ -626,10 +601,10 @@ public async Task PostProcessing()

try
{
logger_.LogInformation("Process task output of type {type}",
reply_.Output.TypeCase);
logger_.LogInformation("Process task output is {type}",
output_.Success);

if (reply_.Output.TypeCase is Output.TypeOneofCase.Ok)
if (output_.Success)
{
logger_.LogDebug("Complete processing of the request");
await agent_.FinalizeTaskCreation(CancellationToken.None)
Expand All @@ -638,7 +613,7 @@ await agent_.FinalizeTaskCreation(CancellationToken.None)

await submitter_.CompleteTaskAsync(taskData_,
false,
reply_.Output,
output_,
CancellationToken.None)
.ConfigureAwait(false);
messageHandler_.Status = QueueMessageStatus.Processed;
Expand Down Expand Up @@ -726,13 +701,8 @@ await taskTable_.ReleaseTask(taskData,

await submitter_.CompleteTaskAsync(taskData,
resubmit,
new Output
{
Error = new Output.Types.Error
{
Details = e.Message,
},
},
new Output(false,
e.Message),
CancellationToken.None)
.ConfigureAwait(false);

Expand Down
9 changes: 5 additions & 4 deletions Common/src/Stream/Worker/IWorkerStreamHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.gRPC.V1.Worker;
using ArmoniK.Core.Base;
using ArmoniK.Core.Common.Storage;

using JetBrains.Annotations;

Expand All @@ -29,7 +29,8 @@ namespace ArmoniK.Core.Common.Stream.Worker;
[PublicAPI]
public interface IWorkerStreamHandler : IInitializable, IDisposable
{
public Task<ProcessReply> StartTaskProcessing(ProcessRequest request,
TimeSpan duration,
CancellationToken cancellationToken);
public Task<Output> StartTaskProcessing(TaskData taskData,
string token,
string dataFolder,
CancellationToken cancellationToken);
}
38 changes: 31 additions & 7 deletions Common/src/Stream/Worker/WorkerStreamHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.Injection.Options;
using ArmoniK.Core.Common.Storage;

using Grpc.Core;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;

using Output = ArmoniK.Core.Common.Storage.Output;

namespace ArmoniK.Core.Common.Stream.Worker;

public class WorkerStreamHandler : IWorkerStreamHandler
Expand Down Expand Up @@ -129,19 +132,40 @@ public async Task<HealthCheckResult> Check(HealthCheckTag tag)
public void Dispose()
=> GC.SuppressFinalize(this);

public async Task<ProcessReply> StartTaskProcessing(ProcessRequest request,
TimeSpan duration,
CancellationToken cancellationToken)
public async Task<Output> StartTaskProcessing(TaskData taskData,
string token,
string dataFolder,
CancellationToken cancellationToken)
{
if (workerClient_ == null)
{
throw new ArmoniKException("Worker client should be initialized");
}

return await workerClient_.ProcessAsync(request,
deadline: DateTime.UtcNow + duration,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
return (await workerClient_.ProcessAsync(new ProcessRequest
{
CommunicationToken = token,
Configuration = new Configuration
{
DataChunkMaxSize = PayloadConfiguration.MaxChunkSize,
},
DataDependencies =
{
taskData.DataDependencies,
},
DataFolder = dataFolder,
ExpectedOutputKeys =
{
taskData.ExpectedOutputIds,
},
PayloadId = taskData.PayloadId,
SessionId = taskData.SessionId,
TaskId = taskData.TaskId,
TaskOptions = taskData.Options.ToGrpcTaskOptions(),
},
deadline: DateTime.UtcNow + taskData.Options.MaxDuration,
cancellationToken: cancellationToken)
.ConfigureAwait(false)).Output;
}

private Task<bool> CheckWorker(CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion Common/src/gRPC/Services/ISubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

using Grpc.Core;

using Output = ArmoniK.Api.gRPC.V1.Output;
using Output = ArmoniK.Core.Common.Storage.Output;
using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions;

namespace ArmoniK.Core.Common.gRPC.Services;
Expand Down
Loading