From 01732876ac3f0b30a6a438a42cd3129395051245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Gurhem?= Date: Thu, 28 Sep 2023 11:06:44 +0200 Subject: [PATCH] refactor: remove gRPC Output type from ISubmitter and IWorkerStreamHandler --- Common/src/Pollster/TaskHandler.cs | 64 ++----- .../src/Stream/Worker/IWorkerStreamHandler.cs | 9 +- .../src/Stream/Worker/WorkerStreamHandler.cs | 38 +++- Common/src/gRPC/Services/ISubmitter.cs | 2 +- Common/src/gRPC/Services/Submitter.cs | 175 +++++++++--------- .../Helpers/ExceptionWorkerStreamHandler.cs | 9 +- Common/tests/Helpers/SimpleSubmitter.cs | 14 +- .../Helpers/SimpleWorkerStreamHandler.cs | 19 +- Common/tests/Pollster/PollsterTest.cs | 46 ++--- Common/tests/Pollster/TaskHandlerTest.cs | 8 +- .../IntegrationGrpcSubmitterServiceTest.cs | 6 +- 11 files changed, 187 insertions(+), 203 deletions(-) diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 1de35ab71..fc8f18de4 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -24,7 +24,6 @@ using ArmoniK.Api.Common.Utils; using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base; using ArmoniK.Core.Common.Exceptions; using ArmoniK.Core.Common.gRPC.Services; @@ -36,7 +35,7 @@ using Microsoft.Extensions.Logging; -using Output = ArmoniK.Api.gRPC.V1.Output; +using Output = ArmoniK.Core.Common.Storage.Output; using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus; namespace ArmoniK.Core.Common.Pollster; @@ -64,7 +63,7 @@ public sealed class TaskHandler : IAsyncDisposable private readonly CancellationTokenSource workerConnectionCts_; private readonly IWorkerStreamHandler workerStreamHandler_; private IAgent? agent_; - private ProcessReply? reply_; + private Output? output_; private SessionData? sessionData_; private TaskData? taskData_; @@ -380,13 +379,8 @@ await taskTable_.ReleaseTask(taskData_, taskData_.TaskId); await submitter_.CompleteTaskAsync(taskData_, true, - new Output - { - Error = new Output.Types.Error - { - Details = "Other pod seems to have crashed, resubmitting task", - }, - }, + new Output(false, + "Other pod seems to have crashed, resubmitting task"), CancellationToken.None) .ConfigureAwait(false); } @@ -557,30 +551,11 @@ await HandleErrorRequeueAsync(e, { // at this point worker requests should have ended logger_.LogDebug("Wait for task output"); - reply_ = await workerStreamHandler_.StartTaskProcessing(new ProcessRequest - { - CommunicationToken = token_, - Configuration = new Configuration - { - DataChunkMaxSize = PayloadConfiguration.MaxChunkSize, - }, - DataDependencies = - { - taskData_.DataDependencies, - }, - DataFolder = folder_, - ExpectedOutputKeys = - { - taskData_.ExpectedOutputIds, - }, - PayloadId = taskData_.PayloadId, - SessionId = taskData_.SessionId, - TaskId = taskData_.TaskId, - TaskOptions = taskData_.Options.ToGrpcTaskOptions(), - }, - taskData_.Options.MaxDuration, - workerConnectionCts_.Token) - .ConfigureAwait(false); + output_ = await workerStreamHandler_.StartTaskProcessing(taskData_, + token_, + folder_, + workerConnectionCts_.Token) + .ConfigureAwait(false); logger_.LogDebug("Stop agent server"); await agentHandler_.Stop(workerConnectionCts_.Token) @@ -614,9 +589,9 @@ public async Task PostProcessing() throw new NullReferenceException(nameof(agent_) + " is null."); } - if (reply_ is null) + if (output_ is null) { - throw new NullReferenceException(nameof(reply_) + " is null."); + throw new NullReferenceException(nameof(output_) + " is null."); } using var _ = logger_.BeginNamedScope("PostProcessing", @@ -626,10 +601,10 @@ public async Task PostProcessing() try { - logger_.LogInformation("Process task output of type {type}", - reply_.Output.TypeCase); + logger_.LogInformation("Process task output is {type}", + output_.Success); - if (reply_.Output.TypeCase is Output.TypeOneofCase.Ok) + if (output_.Success) { logger_.LogDebug("Complete processing of the request"); await agent_.FinalizeTaskCreation(CancellationToken.None) @@ -638,7 +613,7 @@ await agent_.FinalizeTaskCreation(CancellationToken.None) await submitter_.CompleteTaskAsync(taskData_, false, - reply_.Output, + output_, CancellationToken.None) .ConfigureAwait(false); messageHandler_.Status = QueueMessageStatus.Processed; @@ -726,13 +701,8 @@ await taskTable_.ReleaseTask(taskData, await submitter_.CompleteTaskAsync(taskData, resubmit, - new Output - { - Error = new Output.Types.Error - { - Details = e.Message, - }, - }, + new Output(false, + e.Message), CancellationToken.None) .ConfigureAwait(false); diff --git a/Common/src/Stream/Worker/IWorkerStreamHandler.cs b/Common/src/Stream/Worker/IWorkerStreamHandler.cs index 7632a9aa7..c57b4f9b3 100644 --- a/Common/src/Stream/Worker/IWorkerStreamHandler.cs +++ b/Common/src/Stream/Worker/IWorkerStreamHandler.cs @@ -19,8 +19,8 @@ using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base; +using ArmoniK.Core.Common.Storage; using JetBrains.Annotations; @@ -29,7 +29,8 @@ namespace ArmoniK.Core.Common.Stream.Worker; [PublicAPI] public interface IWorkerStreamHandler : IInitializable, IDisposable { - public Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken); + public Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken); } diff --git a/Common/src/Stream/Worker/WorkerStreamHandler.cs b/Common/src/Stream/Worker/WorkerStreamHandler.cs index 30a1c7108..c2eab5d14 100644 --- a/Common/src/Stream/Worker/WorkerStreamHandler.cs +++ b/Common/src/Stream/Worker/WorkerStreamHandler.cs @@ -25,12 +25,15 @@ using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Exceptions; using ArmoniK.Core.Common.Injection.Options; +using ArmoniK.Core.Common.Storage; using Grpc.Core; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Logging; +using Output = ArmoniK.Core.Common.Storage.Output; + namespace ArmoniK.Core.Common.Stream.Worker; public class WorkerStreamHandler : IWorkerStreamHandler @@ -129,19 +132,40 @@ public async Task Check(HealthCheckTag tag) public void Dispose() => GC.SuppressFinalize(this); - public async Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken) + public async Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken) { if (workerClient_ == null) { throw new ArmoniKException("Worker client should be initialized"); } - return await workerClient_.ProcessAsync(request, - deadline: DateTime.UtcNow + duration, - cancellationToken: cancellationToken) - .ConfigureAwait(false); + return (await workerClient_.ProcessAsync(new ProcessRequest + { + CommunicationToken = token, + Configuration = new Configuration + { + DataChunkMaxSize = PayloadConfiguration.MaxChunkSize, + }, + DataDependencies = + { + taskData.DataDependencies, + }, + DataFolder = dataFolder, + ExpectedOutputKeys = + { + taskData.ExpectedOutputIds, + }, + PayloadId = taskData.PayloadId, + SessionId = taskData.SessionId, + TaskId = taskData.TaskId, + TaskOptions = taskData.Options.ToGrpcTaskOptions(), + }, + deadline: DateTime.UtcNow + taskData.Options.MaxDuration, + cancellationToken: cancellationToken) + .ConfigureAwait(false)).Output; } private Task CheckWorker(CancellationToken cancellationToken) diff --git a/Common/src/gRPC/Services/ISubmitter.cs b/Common/src/gRPC/Services/ISubmitter.cs index fa71dfdaa..2510ff69a 100644 --- a/Common/src/gRPC/Services/ISubmitter.cs +++ b/Common/src/gRPC/Services/ISubmitter.cs @@ -26,7 +26,7 @@ using Grpc.Core; -using Output = ArmoniK.Api.gRPC.V1.Output; +using Output = ArmoniK.Core.Common.Storage.Output; using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; namespace ArmoniK.Core.Common.gRPC.Services; diff --git a/Common/src/gRPC/Services/Submitter.cs b/Common/src/gRPC/Services/Submitter.cs index 0d809f940..5ef0d1f5d 100644 --- a/Common/src/gRPC/Services/Submitter.cs +++ b/Common/src/gRPC/Services/Submitter.cs @@ -40,7 +40,7 @@ using Microsoft.Extensions.Logging; -using Output = ArmoniK.Api.gRPC.V1.Output; +using Output = ArmoniK.Core.Common.Storage.Output; using ResultStatus = ArmoniK.Core.Common.Storage.ResultStatus; using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus; @@ -336,93 +336,6 @@ await Task.Delay(currentPollingDelay, return output; } - /// - public async Task CompleteTaskAsync(TaskData taskData, - bool resubmit, - Output output, - CancellationToken cancellationToken = default) - { - using var activity = activitySource_.StartActivity($"{nameof(CompleteTaskAsync)}"); - - Storage.Output cOutput = output; - var taskDataEnd = taskData with - { - EndDate = DateTime.UtcNow, - CreationToEndDuration = DateTime.UtcNow - taskData.CreationDate, - ProcessingToEndDuration = DateTime.UtcNow - taskData.StartDate, - }; - - if (cOutput.Success) - { - await taskTable_.SetTaskSuccessAsync(taskDataEnd, - cancellationToken) - .ConfigureAwait(false); - - logger_.LogInformation("Remove input payload of {task}", - taskData.TaskId); - - //Discard value is used to remove warnings CS4014 !! - _ = Task.Factory.StartNew(async () => await objectStorage_.TryDeleteAsync(taskData.TaskId, - CancellationToken.None) - .ConfigureAwait(false), - cancellationToken); - } - else - { - // TODO FIXME: nothing will resubmit the task if there is a crash there - if (resubmit && taskData.RetryOfIds.Count < taskData.Options.MaxRetries) - { - // not done means that another pod put this task in retry so we do not need to do it a second time - // so nothing to do - if (!await taskTable_.SetTaskRetryAsync(taskDataEnd, - cOutput.Error, - cancellationToken) - .ConfigureAwait(false)) - { - return; - } - - logger_.LogWarning("Resubmit {task}", - taskData.TaskId); - - var newTaskId = await taskTable_.RetryTask(taskData, - cancellationToken) - .ConfigureAwait(false); - - await FinalizeTaskCreation(new List - { - new(newTaskId, - taskData.PayloadId, - taskData.Options, - taskData.ExpectedOutputIds, - taskData.DataDependencies), - }, - taskData.SessionId, - taskData.TaskId, - cancellationToken) - .ConfigureAwait(false); - } - else - { - // not done means that another pod put this task in error so we do not need to do it a second time - // so nothing to do - if (!await taskTable_.SetTaskErrorAsync(taskDataEnd, - cOutput.Error, - cancellationToken) - .ConfigureAwait(false)) - { - return; - } - - await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, - resultTable_, - taskData.TaskId, - CancellationToken.None) - .ConfigureAwait(false); - } - } - } - /// [SuppressMessage("Usage", "CA2208:Instantiate argument exceptions correctly", @@ -580,4 +493,90 @@ await TaskLifeCycleHelper.CreateTasks(taskTable_, return requests; } + + /// + public async Task CompleteTaskAsync(TaskData taskData, + bool resubmit, + Output output, + CancellationToken cancellationToken = default) + { + using var activity = activitySource_.StartActivity($"{nameof(CompleteTaskAsync)}"); + + var taskDataEnd = taskData with + { + EndDate = DateTime.UtcNow, + CreationToEndDuration = DateTime.UtcNow - taskData.CreationDate, + ProcessingToEndDuration = DateTime.UtcNow - taskData.StartDate, + }; + + if (output.Success) + { + await taskTable_.SetTaskSuccessAsync(taskDataEnd, + cancellationToken) + .ConfigureAwait(false); + + logger_.LogInformation("Remove input payload of {task}", + taskData.TaskId); + + //Discard value is used to remove warnings CS4014 !! + _ = Task.Factory.StartNew(async () => await objectStorage_.TryDeleteAsync(taskData.TaskId, + CancellationToken.None) + .ConfigureAwait(false), + cancellationToken); + } + else + { + // TODO FIXME: nothing will resubmit the task if there is a crash there + if (resubmit && taskData.RetryOfIds.Count < taskData.Options.MaxRetries) + { + // not done means that another pod put this task in retry so we do not need to do it a second time + // so nothing to do + if (!await taskTable_.SetTaskRetryAsync(taskDataEnd, + output.Error, + cancellationToken) + .ConfigureAwait(false)) + { + return; + } + + logger_.LogWarning("Resubmit {task}", + taskData.TaskId); + + var newTaskId = await taskTable_.RetryTask(taskData, + cancellationToken) + .ConfigureAwait(false); + + await FinalizeTaskCreation(new List + { + new(newTaskId, + taskData.PayloadId, + taskData.Options, + taskData.ExpectedOutputIds, + taskData.DataDependencies), + }, + taskData.SessionId, + taskData.TaskId, + cancellationToken) + .ConfigureAwait(false); + } + else + { + // not done means that another pod put this task in error so we do not need to do it a second time + // so nothing to do + if (!await taskTable_.SetTaskErrorAsync(taskDataEnd, + output.Error, + cancellationToken) + .ConfigureAwait(false)) + { + return; + } + + await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, + resultTable_, + taskData.TaskId, + CancellationToken.None) + .ConfigureAwait(false); + } + } + } } diff --git a/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs b/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs index 10369436b..5d0f2b7e6 100644 --- a/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs +++ b/Common/tests/Helpers/ExceptionWorkerStreamHandler.cs @@ -19,8 +19,8 @@ using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base.DataStructures; +using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Stream.Worker; using Microsoft.Extensions.Diagnostics.HealthChecks; @@ -45,9 +45,10 @@ public void Dispose() { } - public async Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken) + public async Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken) { await Task.Delay(TimeSpan.FromMilliseconds(delay_), cancellationToken) diff --git a/Common/tests/Helpers/SimpleSubmitter.cs b/Common/tests/Helpers/SimpleSubmitter.cs index 79dd6ab3f..223b7a688 100644 --- a/Common/tests/Helpers/SimpleSubmitter.cs +++ b/Common/tests/Helpers/SimpleSubmitter.cs @@ -28,7 +28,7 @@ using Grpc.Core; -using Output = ArmoniK.Api.gRPC.V1.Output; +using Output = ArmoniK.Core.Common.Storage.Output; using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; @@ -113,12 +113,6 @@ public Task WaitForCompletion(WaitRequest request, CancellationToken cancellationToken) => Task.FromResult(DefaultCount); - public Task CompleteTaskAsync(TaskData taskData, - bool resubmit, - Output output, - CancellationToken cancellationToken = default) - => Task.CompletedTask; - public Task WaitForAvailabilityAsync(ResultRequest request, CancellationToken contextCancellationToken) => Task.FromResult(new AvailabilityReply @@ -132,4 +126,10 @@ public Task SetResult(string sessionId, IAsyncEnumerable> chunks, CancellationToken cancellationToken) => Task.CompletedTask; + + public Task CompleteTaskAsync(TaskData taskData, + bool resubmit, + Output output, + CancellationToken cancellationToken = default) + => Task.CompletedTask; } diff --git a/Common/tests/Helpers/SimpleWorkerStreamHandler.cs b/Common/tests/Helpers/SimpleWorkerStreamHandler.cs index 448d1989d..cebf030e5 100644 --- a/Common/tests/Helpers/SimpleWorkerStreamHandler.cs +++ b/Common/tests/Helpers/SimpleWorkerStreamHandler.cs @@ -19,9 +19,8 @@ using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base.DataStructures; +using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Stream.Worker; using Microsoft.Extensions.Diagnostics.HealthChecks; @@ -39,14 +38,10 @@ public Task Init(CancellationToken cancellationToken) public void Dispose() => GC.SuppressFinalize(this); - public Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken) - => Task.FromResult(new ProcessReply - { - Output = new Output - { - Ok = new Empty(), - }, - }); + public Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken) + => Task.FromResult(new Output(true, + "")); } diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index 042dd18d2..cdc4d6310 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -23,8 +23,6 @@ using System.Threading; using System.Threading.Tasks; -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base; using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Exceptions; @@ -41,10 +39,6 @@ using NUnit.Framework; -using Output = ArmoniK.Api.gRPC.V1.Output; -using ResultStatus = ArmoniK.Core.Common.Storage.ResultStatus; -using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; -using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus; namespace ArmoniK.Core.Common.Tests.Pollster; @@ -214,9 +208,10 @@ public void Dispose() { } - public Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken) + public Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken) => throw new NotImplementedException(); } @@ -245,9 +240,10 @@ public void Dispose() { } - public Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken) + public Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken) => throw new NotImplementedException(); } @@ -421,19 +417,15 @@ public Task Init(CancellationToken cancellationToken) public void Dispose() => GC.SuppressFinalize(this); - public Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken) + public Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken) { Task.Delay(TimeSpan.FromMilliseconds(delay_), cancellationToken); - return Task.FromResult(new ProcessReply - { - Output = new Output - { - Ok = new Empty(), - }, - }); + return Task.FromResult(new Output(true, + "")); } } @@ -567,8 +559,9 @@ public static IEnumerable ExecuteTooManyErrorShouldFailTestCase { // Failing WorkerStreamHandler var mockStreamHandlerFail = new Mock(); - mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), - It.IsAny(), + mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny())) .Throws(new ApplicationException("Failed WorkerStreamHandler")); yield return new TestCaseData(mockStreamHandlerFail, @@ -639,8 +632,9 @@ public async Task UnavailableWorkerShouldFail() var simpleAgentHandler = new SimpleAgentHandler(); var mockStreamHandlerFail = new Mock(); - mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), - It.IsAny(), + mockStreamHandlerFail.Setup(streamHandler => streamHandler.StartTaskProcessing(It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny())) .Throws(new TestUnavailableRpcException("Unavailable worker")); diff --git a/Common/tests/Pollster/TaskHandlerTest.cs b/Common/tests/Pollster/TaskHandlerTest.cs index b5b6cfab3..0417a1c75 100644 --- a/Common/tests/Pollster/TaskHandlerTest.cs +++ b/Common/tests/Pollster/TaskHandlerTest.cs @@ -26,7 +26,6 @@ using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Agent; using ArmoniK.Api.gRPC.V1.Submitter; -using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.Core.Base; using ArmoniK.Core.Base.DataStructures; using ArmoniK.Core.Common.Pollster; @@ -1016,9 +1015,10 @@ public Task Init(CancellationToken cancellationToken) public void Dispose() => GC.SuppressFinalize(this); - public Task StartTaskProcessing(ProcessRequest request, - TimeSpan duration, - CancellationToken cancellationToken) + public Task StartTaskProcessing(TaskData taskData, + string token, + string dataFolder, + CancellationToken cancellationToken) => throw new T(); } diff --git a/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs b/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs index 6ec54fbbd..539d2d582 100644 --- a/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs +++ b/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs @@ -47,7 +47,7 @@ using NUnit.Framework; using Empty = ArmoniK.Api.gRPC.V1.Empty; -using Output = ArmoniK.Api.gRPC.V1.Output; +using Output = ArmoniK.Core.Common.Storage.Output; using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions; using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus; @@ -924,8 +924,8 @@ public Task RemoveRemainingDataDependenciesAsync(ICollection taskId, CancellationToken cancellationToken = default) => throw new T(); - public Task GetTaskOutput(string taskId, - CancellationToken cancellationToken = default) + public Task GetTaskOutput(string taskId, + CancellationToken cancellationToken = default) => throw new T(); public Task AcquireTask(TaskData taskData,