Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: during acquisition, a condition was always true #588

Merged
merged 4 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Common/src/Pollster/Pollster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ await taskHandler.DisposeAsync()
var precondition = await taskHandler.AcquireTask()
.ConfigureAwait(false);

if (precondition)
if (precondition == 0)
{
try
{
Expand Down
52 changes: 27 additions & 25 deletions Common/src/Pollster/TaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ public async Task StopCancelledTask()
/// Acquisition of the task in the message given to the constructor
/// </summary>
/// <returns>
/// Bool representing whether the task has been acquired
/// Integer representing whether the task has been acquired
/// Acquired when return is 0
/// </returns>
/// <exception cref="ArgumentException">status of the task is not recognized</exception>
public async Task<bool> AcquireTask()
public async Task<int> AcquireTask()
lemaitre-aneo marked this conversation as resolved.
Show resolved Hide resolved
{
using var activity = activitySource_.StartActivity($"{nameof(AcquireTask)}");
using var _ = logger_.BeginNamedScope("Acquiring task",
Expand All @@ -197,7 +198,7 @@ public async Task<bool> AcquireTask()
{
messageHandler_.Status = QueueMessageStatus.Postponed;
logger_.LogDebug("Task data read but execution cancellation requested");
return false;
return 1;
}

/*
Expand Down Expand Up @@ -230,15 +231,15 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_,
CancellationToken.None)
.ConfigureAwait(false);

return false;
return 2;
case TaskStatus.Completed:
logger_.LogInformation("Task was already completed");
messageHandler_.Status = QueueMessageStatus.Processed;
return false;
return 3;
case TaskStatus.Creating:
logger_.LogInformation("Task is still creating");
messageHandler_.Status = QueueMessageStatus.Postponed;
return false;
return 4;
case TaskStatus.Submitted:
break;
case TaskStatus.Dispatched:
Expand All @@ -251,7 +252,7 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_,
messageHandler_.TaskId,
CancellationToken.None)
.ConfigureAwait(false);
return false;
return 5;
case TaskStatus.Timeout:
logger_.LogInformation("Task was timeout elsewhere ; taking over here");
break;
Expand All @@ -263,7 +264,7 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_,
messageHandler_.TaskId,
CancellationToken.None)
.ConfigureAwait(false);
return false;
return 6;
case TaskStatus.Processing:

// If OwnerPodId is empty, it means that task was partially started or released
Expand All @@ -279,7 +280,7 @@ await submitter_.CompleteTaskAsync(taskData_,
$"Other pod seems to have released task while keeping {taskData_.Status} status, resubmitting task"),
CancellationToken.None)
.ConfigureAwait(false);
return false;
return 7;
}

// we check if the task was acquired by this pod
Expand Down Expand Up @@ -316,23 +317,23 @@ await submitter_.CompleteTaskAsync(taskData_,
CancellationToken.None)
.ConfigureAwait(false);
messageHandler_.Status = QueueMessageStatus.Processed;
return false;
return 8;
}
}
// task is processing elsewhere so message is duplicated
else
{
messageHandler_.Status = QueueMessageStatus.Processed;
return false;
return 9;
}
}

logger_.LogWarning("Task already in processing on this pod. This scenario should be managed earlier. Message likely duplicated. Removing it from queue");
messageHandler_.Status = QueueMessageStatus.Processed;
return false;
return 10;
case TaskStatus.Retried:
logger_.LogInformation("Task is in retry ; retry task should be executed");
return false;
return 11;
case TaskStatus.Unspecified:
default:
logger_.LogCritical("Task was in an unknown state {state}",
Expand Down Expand Up @@ -365,14 +366,14 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_,
CancellationToken.None)
.ConfigureAwait(false);

return false;
return 12;
}

if (cancellationTokenSource_.IsCancellationRequested)
{
messageHandler_.Status = QueueMessageStatus.Postponed;
logger_.LogDebug("Session running but execution cancellation requested");
return false;
return 13;
}

taskData_ = taskData_ with
Expand All @@ -393,14 +394,15 @@ await taskTable_.ReleaseTask(taskData_,
CancellationToken.None)
.ConfigureAwait(false);
messageHandler_.Status = QueueMessageStatus.Postponed;
return false;
return 14;
}

// empty OwnerPodId means that the task was not acquired because not ready
if (taskData_.OwnerPodId == "")
{
logger_.LogDebug("Task acquired but not ready (empty owner pod id)");
messageHandler_.Status = QueueMessageStatus.Postponed;
return false;
return 15;
}

// we check if the task was acquired by this pod
Expand All @@ -426,12 +428,12 @@ await taskTable_.ReleaseTask(taskData_,
logger_.LogInformation("Task is not running on the other polling agent, status : {status}",
taskData_.Status);

if (taskData_.Status is TaskStatus.Dispatched && taskData_.AcquisitionDate < DateTime.UtcNow + delayBeforeAcquisition_)
if (taskData_.Status is TaskStatus.Dispatched && taskData_.AcquisitionDate + delayBeforeAcquisition_ > DateTime.UtcNow)

{
messageHandler_.Status = QueueMessageStatus.Postponed;
logger_.LogDebug("Wait to exceed acquisition timeout before resubmitting task");
return false;
return 16;
}

if (taskData_.Status is TaskStatus.Processing or TaskStatus.Dispatched or TaskStatus.Processed)
Expand Down Expand Up @@ -463,21 +465,21 @@ await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_,
messageHandler_.TaskId,
CancellationToken.None)
.ConfigureAwait(false);
return false;
return 17;
}
}

// if the task is running elsewhere, then the message is duplicated so we remove it from the queue
// and do not acquire the task
messageHandler_.Status = QueueMessageStatus.Processed;
return false;
return 18;
}

if (taskData_.OwnerPodId == ownerPodId_ && taskData_.Status != TaskStatus.Dispatched)
{
logger_.LogInformation("Task is already managed by this agent; message likely to be duplicated");
messageHandler_.Status = QueueMessageStatus.Processed;
return false;
return 19;
}

if (cancellationTokenSource_.IsCancellationRequested)
Expand All @@ -487,18 +489,18 @@ await taskTable_.ReleaseTask(taskData_,
CancellationToken.None)
.ConfigureAwait(false);
messageHandler_.Status = QueueMessageStatus.Postponed;
return false;
return 20;
}

logger_.LogDebug("Task preconditions are OK");
return true;
return 0;
}
catch (TaskNotFoundException e)
{
logger_.LogWarning(e,
"TaskId coming from message queue was not found, delete message from queue");
messageHandler_.Status = QueueMessageStatus.Processed;
return false;
return 21;
}
}

Expand Down
5 changes: 3 additions & 2 deletions Common/src/Storage/TaskTableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,10 @@ public static async Task<TaskData> AcquireTask(this ITaskTable taskTable,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

taskTable.Logger.LogInformation("Acquire task {task} on {podName}",
taskTable.Logger.LogInformation("Acquire task {task} on {podName} with {success}",
taskData.TaskId,
taskData.OwnerPodId);
taskData.OwnerPodId,
res is not null);

return res ?? await taskTable.ReadTaskAsync(taskData.TaskId,
cancellationToken)
Expand Down
17 changes: 13 additions & 4 deletions Common/tests/Helpers/TestTaskHandlerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler,
IAgentHandler agentHandler,
IQueueMessageHandler queueStorage,
CancellationTokenSource cancellationTokenSource,
ITaskTable? inputTaskTable = null,
ISessionTable? inputSessionTable = null)
ITaskTable? inputTaskTable = null,
ISessionTable? inputSessionTable = null,
ITaskProcessingChecker? taskProcessingChecker = null)
{
var logger = NullLogger.Instance;

Expand Down Expand Up @@ -165,8 +166,16 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler,
{
},
cancellationTokenSource))
.AddSingleton<DataPrefetcher>()
.AddSingleton<ITaskProcessingChecker, HelperTaskProcessingChecker>();
.AddSingleton<DataPrefetcher>();

if (taskProcessingChecker is not null)
{
builder.Services.AddSingleton(taskProcessingChecker);
}
else
{
builder.Services.AddSingleton<ITaskProcessingChecker, HelperTaskProcessingChecker>();
}

if (inputTaskTable is not null)
{
Expand Down
Loading