From 74d465deb3d5be1d780da5ecaf5095e6f9f58a1e Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Wed, 3 Apr 2024 09:57:41 -0700 Subject: [PATCH 1/9] Adding control-queue monitor with dedicated orchestrator to each control-queue. --- samples/DurableTask.Samples/Program.cs | 96 +++- .../AzureStorageOrchestrationService.cs | 474 +++++++++++++++++- ...zureStorageOrchestrationServiceSettings.cs | 21 + .../ControlQueueHeartbeatDetectionInfo.cs | 33 ++ .../ControlQueueHeartbeatTaskContext.cs | 38 ++ .../ControlQueueHeartbeatTaskInputContext.cs | 32 ++ ...ControlQueueHeartbeatTaskOrchestratorV1.cs | 181 +++++++ .../ControlQueueHeartbeat/FileWriter.cs | 59 +++ .../IControlQueueHelper.cs | 54 ++ 9 files changed, 945 insertions(+), 43 deletions(-) create mode 100644 src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatDetectionInfo.cs create mode 100644 src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskContext.cs create mode 100644 src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskInputContext.cs create mode 100644 src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs create mode 100644 src/DurableTask.AzureStorage/ControlQueueHeartbeat/FileWriter.cs create mode 100644 src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs diff --git a/samples/DurableTask.Samples/Program.cs b/samples/DurableTask.Samples/Program.cs index a246c760d..2185bec5b 100644 --- a/samples/DurableTask.Samples/Program.cs +++ b/samples/DurableTask.Samples/Program.cs @@ -22,7 +22,9 @@ namespace DurableTask.Samples using System.IO; using System.Linq; using System.Threading; + using System.Threading.Tasks; using DurableTask.AzureStorage; + using DurableTask.AzureStorage.ControlQueueHeartbeat; using DurableTask.Core; using DurableTask.Core.Tracing; using DurableTask.Samples.AverageCalculator; @@ -62,7 +64,7 @@ static void Main(string[] args) var orchestrationServiceAndClient = new AzureStorageOrchestrationService(settings); var taskHubClient = new TaskHubClient(orchestrationServiceAndClient); var taskHubWorker = new TaskHubWorker(orchestrationServiceAndClient); - + if (ArgumentOptions.CreateHub) { orchestrationServiceAndClient.CreateIfNotExistsAsync().Wait(); @@ -85,12 +87,12 @@ static void Main(string[] args) throw new ArgumentException("parameters"); } - instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(GreetingsOrchestration2), instanceId, + instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(GreetingsOrchestration2), instanceId, int.Parse(ArgumentOptions.Parameters[0])).Result; break; case "Cron": // Sample Input: "0 12 * */2 Mon" - instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(CronOrchestration), instanceId, + instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(CronOrchestration), instanceId, (ArgumentOptions.Parameters != null && ArgumentOptions.Parameters.Length > 0) ? ArgumentOptions.Parameters[0] : null).Result; break; case "Average": @@ -108,9 +110,9 @@ static void Main(string[] args) break; case "SumOfSquares": instance = taskHubClient.CreateOrchestrationInstanceAsync( - "SumOfSquaresOrchestration", - "V1", - instanceId, + "SumOfSquaresOrchestration", + "V1", + instanceId, File.ReadAllText("SumofSquares\\BagOfNumbers.json"), new Dictionary(1) { { "Category", "testing" } }).Result; break; @@ -129,6 +131,19 @@ static void Main(string[] args) instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(MigrateOrchestration), instanceId, new MigrateOrchestrationData { SubscriptionId = "03a1cd39-47ac-4a57-9ff5-a2c2a2a76088", IsDisabled = false }).Result; break; + case "ControlQueueHeartbeatMonitor": + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + var taskWorker1 = TriggerTaskHubWithMonitor("workerId1", "taskHub1", cancellationTokenSource.Token); + var taskWorker2 = TriggerTaskHubWithMonitor("workerId2", "taskHub1", cancellationTokenSource.Token); + var taskWorker3 = TriggerTaskHubWithMonitor("WorkerId1", "taskHub1", cancellationTokenSource.Token); + + Task.Delay(TimeSpan.FromMinutes(5)).Wait(); + + cancellationTokenSource.Cancel(); + taskWorker1.StopAsync().Wait(); + taskWorker2.StopAsync().Wait(); + taskWorker3.StopAsync().Wait(); + break; default: throw new Exception("Unsupported Orchestration Name: " + ArgumentOptions.StartInstance); } @@ -139,7 +154,7 @@ static void Main(string[] args) { Console.WriteLine("Run RaiseEvent"); - if (string.IsNullOrWhiteSpace(ArgumentOptions.InstanceId)) + if (string.IsNullOrWhiteSpace(ArgumentOptions.InstanceId)) { throw new ArgumentException("instanceId"); } @@ -163,10 +178,10 @@ static void Main(string[] args) { taskHubWorker.AddTaskOrchestrations( typeof(GreetingsOrchestration), - typeof(GreetingsOrchestration2), + typeof(GreetingsOrchestration2), typeof(CronOrchestration), - typeof(AverageCalculatorOrchestration), - typeof(ErrorHandlingOrchestration), + typeof(AverageCalculatorOrchestration), + typeof(ErrorHandlingOrchestration), typeof(SignalOrchestration), typeof(MigrateOrchestration), typeof(SumOfSquaresOrchestration) @@ -174,14 +189,14 @@ static void Main(string[] args) taskHubWorker.AddTaskOrchestrations( new NameValueObjectCreator("SumOfSquaresOrchestration", "V1", typeof(SumOfSquaresOrchestration))); - + taskHubWorker.AddTaskActivities( - new GetUserTask(), - new SendGreetingTask(), - new CronTask(), - new ComputeSumTask(), - new GoodTask(), - new BadTask(), + new GetUserTask(), + new SendGreetingTask(), + new CronTask(), + new ComputeSumTask(), + new GoodTask(), + new BadTask(), new CleanupTask(), new EmailTask(), new SumOfSquaresTask() @@ -215,6 +230,53 @@ static void Main(string[] args) } } + private static TaskHubWorker TriggerTaskHubWithMonitor(string workerId, string taskHubName, CancellationToken cancellationToken) + { + string storageConnectionString = GetSetting("StorageConnectionString"); + + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountDetails = new StorageAccountDetails { ConnectionString = storageConnectionString }, + TaskHubName = taskHubName, + UseTablePartitionManagement = true, + PartitionCount = 10, + ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(5), + ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(10), + ControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(10), + WorkerId = workerId + }; + + var orchestrationServiceAndClient = new AzureStorageOrchestrationService(settings); + var taskHubClient = new TaskHubClient(orchestrationServiceAndClient); + var taskHubWorker = new TaskHubWorker(orchestrationServiceAndClient); + + var controlQueueHealthMonitor = (IControlQueueHelper)orchestrationServiceAndClient; + var task = controlQueueHealthMonitor.StartControlQueueHeartbeatMonitorAsync( + taskHubClient, + taskHubWorker, + async (orchestrationInstance, controlQueueHeartbeatTaskInputContext, controlQueueHeartbeatTaskContext, cancellationToken) => + { + FileWriter.WriteLogControlQueueProgram($"Heartbeat coming from instanceId {orchestrationInstance.InstanceId} " + + $"running for controlQueueHeartbeatTaskInputContext = [{controlQueueHeartbeatTaskInputContext}] " + + $"with orchestrator running with = [{controlQueueHeartbeatTaskContext}]."); + + if (cancellationToken.IsCancellationRequested) + { + throw new InvalidOperationException($"Dummy exception."); + } + + await Task.CompletedTask; + }, + async (workerId, ownerId, isControlQueueOwner, controlQueueName, instanceId, controlQueueHeartbeatDetectionInfo, cancellationToken) => + { + FileWriter.WriteLogControlQueueProgram($"Act on taskhubworker {workerId} [isControlQueueOwner={isControlQueueOwner}] where control queue {controlQueueName} with controlQueueHeartbeatDetectionInfo {controlQueueHeartbeatDetectionInfo.ToString()} owned by ownerId {ownerId} is either found stuck or too slow, using instanceId {instanceId}."); + await Task.CompletedTask; + }, + cancellationToken); + taskHubWorker.StartAsync().Wait(); + return taskHubWorker; + } + public static string GetSetting(string name) { string value = Environment.GetEnvironmentVariable("DurableTaskTest" + name); diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9f5d15047..1eeccfa6c 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -15,12 +15,14 @@ namespace DurableTask.AzureStorage using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Diagnostics; using System.Diagnostics.Tracing; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; + using DurableTask.AzureStorage.ControlQueueHeartbeat; using DurableTask.AzureStorage.Messaging; using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; @@ -40,10 +42,11 @@ namespace DurableTask.AzureStorage public sealed class AzureStorageOrchestrationService : IOrchestrationService, IOrchestrationServiceClient, - IDisposable, + IDisposable, IOrchestrationServiceQueryClient, IOrchestrationServicePurgeClient, - IEntityOrchestrationService + IEntityOrchestrationService, + IControlQueueHelper { static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0]; @@ -155,7 +158,7 @@ public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings if (this.settings.UseTablePartitionManagement && this.settings.UseLegacyPartitionManagement) { - throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager."); + throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager."); } else if (this.settings.UseTablePartitionManagement) { @@ -183,6 +186,8 @@ public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings this.settings.TaskHubName.ToLowerInvariant() + "-applease", this.settings.TaskHubName.ToLowerInvariant() + "-appleaseinfo", this.settings.AppLeaseOptions); + + semaphoreSlimForControlQueueMonitorCallBack = new SemaphoreSlim(this.settings.PartitionCount); } internal string WorkerId => this.settings.WorkerId; @@ -239,11 +244,16 @@ static void ValidateSettings(AzureStorageOrchestrationServiceSettings settings) throw new ArgumentOutOfRangeException(nameof(settings), "The number of partitions must be a positive integer and no greater than 16."); } - if (string.IsNullOrEmpty(settings.TaskHubName)) + if (string.IsNullOrWhiteSpace(settings.TaskHubName)) { throw new ArgumentNullException(nameof(settings), $"A {nameof(settings.TaskHubName)} value must be configured in the settings."); } + if (settings.ControlQueueHearbeatOrchestrationInterval > settings.ControlQueueOrchHeartbeatDetectionThreshold) + { + throw new ArgumentException(nameof(settings), $"{nameof(settings.ControlQueueHearbeatOrchestrationInterval)} must not be more than {nameof(settings.ControlQueueOrchHeartbeatDetectionThreshold)}"); + } + // TODO: More validation. } @@ -630,7 +640,7 @@ internal static async Task GetControlQueuesAsync( // Need to check for leases in Azure Table Storage. Scale Controller calls into this method. int partitionCount; Table partitionTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.PartitionTableName); - + // Check if table partition manager is used. If so, get partition count from table. // Else, get the partition count from the blobs. if (await partitionTable.ExistsAsync()) @@ -872,12 +882,12 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) var messages = session.CurrentMessageBatch; TraceContextBase parentTraceContext = null; bool foundEventRaised = false; - foreach(var message in messages) + foreach (var message in messages) { if (message.SerializableTraceContext != null) { var traceContext = TraceContextBase.Restore(message.SerializableTraceContext); - switch(message.TaskMessage.Event) + switch (message.TaskMessage.Event) { // Dependency Execution finished. case TaskCompletedEvent tc: @@ -901,15 +911,16 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) break; default: // When internal error happens, multiple message could come, however, it should not be prioritized. - if (parentTraceContext == null || + if (parentTraceContext == null || parentTraceContext.OrchestrationTraceContexts.Count < traceContext.OrchestrationTraceContexts.Count) { parentTraceContext = traceContext; } break; - } - } else + } + } + else { // In this case, we set the parentTraceContext later in this method @@ -917,7 +928,7 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) { foundEventRaised = true; } - } + } } // When EventRaisedEvent is present, it will not, out of the box, share the same operation @@ -934,12 +945,12 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) static bool IsActivityOrOrchestrationFailedOrCompleted(IList messages) { - foreach(var message in messages) + foreach (var message in messages) { if (message.TaskMessage.Event is DurableTask.Core.History.SubOrchestrationInstanceCompletedEvent || message.TaskMessage.Event is DurableTask.Core.History.SubOrchestrationInstanceFailedEvent || message.TaskMessage.Event is DurableTask.Core.History.TaskCompletedEvent || - message.TaskMessage.Event is DurableTask.Core.History.TaskFailedEvent || + message.TaskMessage.Event is DurableTask.Core.History.TaskFailedEvent || message.TaskMessage.Event is DurableTask.Core.History.TimerFiredEvent) { return true; @@ -1133,13 +1144,13 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( // Correlation CorrelationTraceClient.Propagate(() => + { + // In case of Extended Session, Emit the Dependency Telemetry. + if (workItem.IsExtendedSession) { - // In case of Extended Session, Emit the Dependency Telemetry. - if (workItem.IsExtendedSession) - { - this.TrackExtendedSessionDependencyTelemetry(session); - } - }); + this.TrackExtendedSessionDependencyTelemetry(session); + } + }); TraceContextBase currentTraceContextBaseOnComplete = null; CorrelationTraceClient.Propagate(() => @@ -1224,7 +1235,7 @@ static bool DependencyTelemetryStarted( TaskMessage continuedAsNewMessage, OrchestrationState orchestrationState) { - return + return (outboundMessages.Count != 0 || orchestratorMessages.Count != 0 || timerMessages.Count != 0) && (orchestrationState.OrchestrationStatus != OrchestrationStatus.Completed) && (orchestrationState.OrchestrationStatus != OrchestrationStatus.Failed); @@ -1266,7 +1277,7 @@ static TraceContextBase CreateOrRestoreRequestTraceContextWithDependencyTracking bool dependencyTelemetryStarted) { TraceContextBase currentTraceContextBaseOnComplete = null; - + if (dependencyTelemetryStarted) { // DependencyTelemetry will be included on an outbound queue @@ -1276,7 +1287,7 @@ static TraceContextBase CreateOrRestoreRequestTraceContextWithDependencyTracking } else { - switch(orchestrationState.OrchestrationStatus) + switch (orchestrationState.OrchestrationStatus) { case OrchestrationStatus.Completed: case OrchestrationStatus.Failed: @@ -1692,7 +1703,7 @@ public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, Orch { throw new InvalidOperationException($"An Orchestration instance with the status {existingInstance.State.OrchestrationStatus} already exists."); } - + return; } @@ -1993,7 +2004,7 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync while (!cancellationToken.IsCancellationRequested) { OrchestrationState? state = await this.GetOrchestrationStateAsync(instanceId, executionId); - + if (state != null && state.OrchestrationStatus != OrchestrationStatus.Running && state.OrchestrationStatus != OrchestrationStatus.Suspended && @@ -2007,9 +2018,9 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync } return state; } - + timeout -= statusPollingInterval; - + // For a user-provided timeout of `TimeSpan.Zero`, // we want to check the status of the orchestration once and then return. // Therefore, we check the timeout condition after the status check. @@ -2046,6 +2057,417 @@ public Task DownloadBlobAsync(string blobUri) #endregion + #region IControlQueueHelper + + /// + public async Task StartControlQueueHeartbeatMonitorAsync( + TaskHubClient taskHubClient, + TaskHubWorker taskHubWorker, + Func callBackHeartOrchAsync, + Func callBackControlQueueValidation, + CancellationToken cancellationToken) + { + // Validate if taskHubClient and taskHubWorker used is of correct type and settings. + ValidateTaskHubClient(taskHubClient); + ValidateTaskHubWorker(taskHubWorker); + + Stopwatch stopwatch = Stopwatch.StartNew(); + + // Schedule orchestrator instance for each control-queue. + await ScheduleControlQueueHeartbeatOrchestrations(taskHubClient, false); + + // Register orchestrator for control-queue heartbeat. + RegisterControlQueueHeartbeatOrchestration(taskHubWorker, callBackHeartOrchAsync); + + // Gets control-queue name to orchestrator instance id dictionary. + Dictionary controlQueueOrchInstanceIds = GetControlQueueToInstanceIdInfo(); + + // Waiting for detection interval initial to give time to worker to allow some draining of messages from control-queue. + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); + + while (!cancellationToken.IsCancellationRequested) + { + var localCancellationTokenSource = new CancellationTokenSource(this.settings.ControlQueueOrchHeartbeatDetectionInterval); + var linkedCancelationTokenSrc = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, localCancellationTokenSource.Token); + + if (!cancellationToken.IsCancellationRequested) + { + Dictionary controlQueueOwnerIds = new Dictionary(); + + try + { + // Gets control-queue name to owner id dictionary. + controlQueueOwnerIds = await GetControlQueueOwnerIds(); + } + catch (Exception ex) + { + // [Logs] Add exception details for failure at fetching owners of control-queue. + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + + $"exception: {ex.ToString()}" + + $"message: failed to fetch owner ids for control-queues."); + } + + Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => + { + var controlQueueName = controlQueueOrchInstanceId.Key; + var instanceId = controlQueueOrchInstanceId.Value; + + if ((controlQueueOwnerIds.Count == 0)) + { + // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. + await RunCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, linkedCancelationTokenSrc.Token); + } + else + { + var ownerId = controlQueueOwnerIds[controlQueueName]; + + // Fetch orchestration instance and validate control-queue stuck. + await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, linkedCancelationTokenSrc.Token); + } + }); + } + + // Waiting for detection interval. + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, linkedCancelationTokenSrc.Token); + } + } + + /// + public async Task ScheduleControlQueueHeartbeatOrchestrations(TaskHubClient taskHubClient, bool force = false) + { + // Validate taskhubclient. + ValidateTaskHubClient(taskHubClient); + + // Get control-queue to instance-id map. + Dictionary controlQueueOrchInstanceIds = GetControlQueueToInstanceIdInfo(); + + foreach (var controlQueueOrchInstanceId in controlQueueOrchInstanceIds) + { + var controlQueueName = controlQueueOrchInstanceId.Key; + var instanceId = controlQueueOrchInstanceId.Value; + + if (!force) + { + var state = await taskHubClient.GetOrchestrationStateAsync(instanceId); + + // if the orchestration instance is not completed, then don't add orchestration instance again. + if (state != null && ( + state.OrchestrationStatus == OrchestrationStatus.Pending + || state.OrchestrationStatus == OrchestrationStatus.Running + || state.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew + )) + { + // [Logs] Orchestration instance already exists with state and orchestration id. + FileWriter.WriteLogControlQueueMonitor($"ControlQueueHeartbeatOrchestrationsAlreadyQueued" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"message: orchestration already present in incomplete state."); + + continue; + } + } + + // Creating input context for control queue with control-queue name, taskhub name, and partition count. + var orchInput = new ControlQueueHeartbeatTaskInputContext(controlQueueName, this.settings.TaskHubName, this.settings.PartitionCount); + + // Creating the orchestration instance. + await taskHubClient.CreateOrchestrationInstanceAsync( + ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, + ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, + controlQueueOrchInstanceId.Value, + orchInput); + } + } + + /// + public void RegisterControlQueueHeartbeatOrchestration(TaskHubWorker taskHubWorker, Func callBack) + { + ValidateTaskHubWorker(taskHubWorker); + + if (!isControlQueueOrchestratorsRegistered) + { + lock (controlQueueOrchestratorsRegistrationLock) + { + if (!isControlQueueOrchestratorsRegistered) + { + // Creating initial context object for orchestration. + // This context will be available for each orchestration ran of this type in the taskhubworker. + ControlQueueHeartbeatTaskOrchestratorV1 controlQueueHeartbeatTaskOrchestrator = new ControlQueueHeartbeatTaskOrchestratorV1( + new ControlQueueHeartbeatTaskContext( + this.settings.TaskHubName, + this.settings.PartitionCount), + this.settings.ControlQueueHearbeatOrchestrationInterval, + callBack); + + var objectCreator = new NameValueObjectCreator(ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, controlQueueHeartbeatTaskOrchestrator); + + // Registering task orchestration. + taskHubWorker.AddTaskOrchestrations(objectCreator); + } + } + } + } + + /// + public string GetControlQueueInstanceId(int[] controlQueueNumbers, string instanceIdPrefix = "") + { + var instanceId = string.Empty; + int suffix = 0; + bool foundInstanceId = false; + + var partitionCount = this.settings.PartitionCount; + + // Validating control-queue numbers are valid. + foreach (var controlQueueNumber in controlQueueNumbers) + { + if (controlQueueNumber < 0 && controlQueueNumber >= partitionCount) + { + throw new InvalidOperationException($"{nameof(controlQueueNumbers)} has at least one value which is not in range of [0,{partitionCount - 1}] with partition count = {partitionCount}."); + } + } + + // Updating suffix and checking control-queue being from provided list until found one. + while (!foundInstanceId) + { + suffix++; + instanceId = $"{instanceIdPrefix}{suffix}"; + var controlQueueNumber = (int)Fnv1aHashHelper.ComputeHash(instanceId) % partitionCount; + + if (controlQueueNumbers.Any(x => x == controlQueueNumber)) + { + foundInstanceId = true; + } + } + + return instanceId; + } + + private void ValidateTaskHubWorker(TaskHubWorker taskHubWorker) + { + if (!(taskHubWorker.orchestrationService is AzureStorageOrchestrationService azureStorageOrchestrationServiceTaskHubWorker)) + { + throw new InvalidOperationException($"TaskhubWorker is not using AzureStorageOrchestrationService."); + } + + if (!(this.settings.TaskHubName.Equals(azureStorageOrchestrationServiceTaskHubWorker.settings.TaskHubName) + && this.settings.PartitionCount == azureStorageOrchestrationServiceTaskHubWorker.settings.PartitionCount)) + { + throw new InvalidOperationException($"TaskhubWorker's AzureStorageOrchestrationService is not having either TaskHubName and/or PartitionCount mismatch."); + } + } + + private void ValidateTaskHubClient(TaskHubClient taskHubClient) + { + if (!(taskHubClient.ServiceClient is AzureStorageOrchestrationService azureStorageOrchestrationService)) + { + throw new InvalidOperationException($"TaskhubClient is not using AzureStorageOrchestrationService."); + } + + if (!(this.settings.TaskHubName.Equals(azureStorageOrchestrationService.settings.TaskHubName) + && this.settings.PartitionCount == azureStorageOrchestrationService.settings.PartitionCount)) + { + throw new InvalidOperationException($"TaskhubClient's AzureStorageOrchestrationService is not having either TaskHubName and/or PartitionCount mismatch."); + } + } + + private async Task ValidateControlQueueOrchestrationAsync( + TaskHubClient taskHubClient, + Func callBack, + string? ownerId, + string controlQueueName, + string instanceId, + CancellationToken cancellationToken) + { + DateTime currentTimeUTC = DateTime.UtcNow; + + // Make it time boxed. + var timeBoxedActivity = Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); + var orchInstanceTask = taskHubClient.GetOrchestrationStateAsync(instanceId); + + await Task.WhenAny(timeBoxedActivity, orchInstanceTask); + + if (!orchInstanceTask.IsCompleted) + { + // orchestrator fetch step timed out. + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceNotFound" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"message: orchestration instance couldn't fetch in time."); + + // Run the callback with ownerId and ControlQueueHeartbeatDetectionInfo as OrchestrationInstanceNotFound. + await RunCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceNotFound, cancellationToken); + } + if (orchInstanceTask.Result == null) + { + // orchestrator instance not found in control-queue.. + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceFetchTimedOut" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"message: orchestration instance not found."); + + // Run the callback with ownerId and ControlQueueHeartbeatDetectionInfo as OrchestrationInstanceNotFound. + await RunCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceFetchTimedOut, cancellationToken); + } + else + { + var orchInstance = orchInstanceTask.Result; + + var lastUpdatedTimeUTC = orchInstance.LastUpdatedTime; + + var diffInSeconds = currentTimeUTC - lastUpdatedTimeUTC; + + // If difference in last updated time and current time is greater than threshold, then log the 'OrchestrationInstanceStuck' and run callback. + if (this.settings.ControlQueueOrchHeartbeatDetectionThreshold < diffInSeconds) + { + // orchestrator instance not found in control-queue.. + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceStuck" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"lastUpdatedTimeUTC: {lastUpdatedTimeUTC.ToLongTimeString()}" + + $"currentTimeUTC: {currentTimeUTC.ToLongTimeString()}" + + $"message: orchestration instance is stuck."); + + await RunCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceStuck, cancellationToken); + } + } + } + + private async Task RunCallBack( + Func callBack, + string? ownerId, + string controlQueueName, + string instanceId, + ControlQueueHeartbeatDetectionInfo controlQueueHeartbeatDetectionInfo, + CancellationToken cancellationToken) + { + Stopwatch stopwatch = Stopwatch.StartNew(); + bool gotSemaphore = false; + + try + { + // waiting on semaphore with timeout to avoid thread hogging and starvation. + gotSemaphore = await semaphoreSlimForControlQueueMonitorCallBack.WaitAsync(this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + + var isControlQueueOwner = this.settings.WorkerId.Equals(ownerId); + + if (gotSemaphore) + { + var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); + var callBackTask = callBack(this.settings.WorkerId, ownerId, isControlQueueOwner, controlQueueName, instanceId, controlQueueHeartbeatDetectionInfo, cancellationToken); + + // Do not allow callBackTask to run forever. + await Task.WhenAll(callBackTask, delayTask); + + if (!callBackTask.IsCompleted) + { + // [Logs] Add log for long running callback. + FileWriter.WriteLogControlQueueMonitor($"ControlQueueMonitorCallbackTerminated" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"duration: {stopwatch.ElapsedMilliseconds}" + + $"message: callback is taking too long to cmplete."); + } + } + } + // Not throwing anything beyond this. + catch (Exception ex) + { + FileWriter.WriteLogControlQueueMonitor($"ControlQueueMonitorCallbackFailed " + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"Exception: {ex.ToString()} " + + $"ElapsedMilliseconds: {stopwatch.ElapsedMilliseconds} "); + } + // ensuring semaphore is released. + finally + { + if (gotSemaphore) + { + semaphoreSlimForControlQueueMonitorCallBack.Release(); + } + } + } + + private async Task> GetControlQueueOwnerIds() + { + if (this.settings.UseTablePartitionManagement) + { + return GetControlQueueOwnerIdsFromTableLeases(); + } + else + { + return await GetControlQueueOwnerIdsFromBlobLeasesAsync(); + } + } + + private Dictionary GetControlQueueOwnerIdsFromTableLeases() + { + // Get table leases. + IEnumerable ownershipLeases = this.ListTableLeases(); + + string ownershipInfo = string.Empty; + + Dictionary controlQueueOwnerIds = new Dictionary(); + + // Tranform the table lease to control-queue name to owner-id map. + foreach (var ownershipLease in ownershipLeases) + { + var controlQueueName = ownershipLease.RowKey ?? string.Empty; + controlQueueOwnerIds[controlQueueName] = ownershipLease.CurrentOwner; + ownershipInfo += $"[PartitionId={ownershipLease.RowKey}, Owner={ownershipLease.CurrentOwner}, NextOwner={ownershipLease.NextOwner}, OwnedSince={ownershipLease.OwnedSince}]"; + } + + FileWriter.WriteLogControlQueueMonitor($"OwnershipInfo for all control queues: {ownershipInfo}"); + return controlQueueOwnerIds; + } + + private async Task> GetControlQueueOwnerIdsFromBlobLeasesAsync() + { + // Get blob leases. + IEnumerable ownershipLeases = await this.ListBlobLeasesAsync(); + + string ownershipInfo = string.Empty; + + Dictionary controlQueueOwnerIds = new Dictionary(); + + // Tranform the blob lease to control-queue name to owner-id map. + foreach (var ownershipLease in ownershipLeases) + { + controlQueueOwnerIds[ownershipLease.PartitionId] = ownershipLease.Owner; + ownershipInfo += $"[PartitionId={ownershipLease.PartitionId}, Owner={ownershipLease.Owner}, Token={ownershipLease.Token}, Epoch={ownershipLease.Epoch}]"; + } + + FileWriter.WriteLogControlQueueMonitor($"OwnershipInfo for all control queues: {ownershipInfo}"); + return controlQueueOwnerIds; + } + + private Dictionary GetControlQueueToInstanceIdInfo() + { + var partitionCount = this.settings.PartitionCount; + var controlQueueOrchInstanceIds = new Dictionary(); + + // Generate control-queue name to instance id map. + for (int controlQueueNumber = 0; controlQueueNumber < partitionCount; controlQueueNumber++) + { + var controlQueueName = GetControlQueueName(this.settings.TaskHubName, controlQueueNumber); + var instanceIdPrefix = $"DTF_PC_{partitionCount}_CQ_{controlQueueNumber}_"; + + string instanceId = GetControlQueueInstanceId(new int[] { controlQueueNumber }, instanceIdPrefix); + + controlQueueOrchInstanceIds[controlQueueName] = instanceId; + } + + return controlQueueOrchInstanceIds; + } + + private SemaphoreSlim semaphoreSlimForControlQueueMonitorCallBack; + + private bool isControlQueueOrchestratorsRegistered = false; + + private object controlQueueOrchestratorsRegistrationLock = new object(); + + #endregion IControlQueueHelper + // TODO: Change this to a sticky assignment so that partition count changes can // be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1 async Task GetControlQueueAsync(string instanceId) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 609a8b35d..107946837 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -32,6 +32,12 @@ public class AzureStorageOrchestrationServiceSettings internal static readonly TimeSpan DefaultMaxQueuePollingInterval = TimeSpan.FromSeconds(30); + internal static readonly TimeSpan DefaultControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(60); + + internal static readonly TimeSpan DefaultControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(600); + + internal static readonly TimeSpan DefaultControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(600); + LogHelper logHelper; /// @@ -177,6 +183,21 @@ public class AzureStorageOrchestrationServiceSettings /// public TimeSpan MaxQueuePollingInterval { get; set; } = DefaultMaxQueuePollingInterval; + /// + /// Time interval between control queue heartbeat orchestration. + /// + public TimeSpan ControlQueueHearbeatOrchestrationInterval { get; set; } = DefaultControlQueueHearbeatOrchestrationInterval; + + /// + /// Time interval between control queue heartbeat orchestration. + /// + public TimeSpan ControlQueueOrchHeartbeatDetectionInterval { get; set; } = DefaultControlQueueOrchHeartbeatDetectionInterval; + + /// + /// Time interval between control queue heartbeat orchestration. + /// + public TimeSpan ControlQueueOrchHeartbeatDetectionThreshold { get; set; } = DefaultControlQueueOrchHeartbeatDetectionThreshold; + /// /// If true, takes a lease on the task hub container, allowing for only one app to process messages in a task hub at a time. /// diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatDetectionInfo.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatDetectionInfo.cs new file mode 100644 index 000000000..257e34ad2 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatDetectionInfo.cs @@ -0,0 +1,33 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Control-queue heartbeat detection information. + /// + public enum ControlQueueHeartbeatDetectionInfo + { + /// + /// Default value. + /// + Unknown, + + /// + /// The orchestration instance for control-queue is stuck. + /// + OrchestrationInstanceStuck, + + /// + /// Expected orchestration instance for control-queue timed out. + /// + OrchestrationInstanceFetchTimedOut, + + /// + /// Expected orchestration instance for control-queue not found. + /// + OrchestrationInstanceNotFound, + + /// + /// Control-queue owner information fetch failed. + /// + ControlQueueOwnerFetchFailed + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskContext.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskContext.cs new file mode 100644 index 000000000..135f2c6c0 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskContext.cs @@ -0,0 +1,38 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Context for ControlQueueHeartbeat task orchestrator. + /// + public class ControlQueueHeartbeatTaskContext + { + /// + /// Name of taskhub. + /// + public string TaskHubName { get; private set; } + + /// + /// Number of partitions. + /// + public int PartitionCount { get; private set; } + + /// + /// ControlQueueHeartbeatTaskContext constructor. + /// + /// Name of taskhub. + /// Number of partitions. + public ControlQueueHeartbeatTaskContext(string taskhubName, int partitionCount) + { + this.TaskHubName = taskhubName; + this.PartitionCount = partitionCount; + } + + /// + /// Returns a string that represents the OrchestrationInstance. + /// + /// A string that represents the current object. + public override string ToString() + { + return $"[TaskHubName={TaskHubName}, PartitionCount = {this.PartitionCount}]"; + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskInputContext.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskInputContext.cs new file mode 100644 index 000000000..0494d489d --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskInputContext.cs @@ -0,0 +1,32 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Input for ControlQueueHeartbeat orchestration instance. + /// + public class ControlQueueHeartbeatTaskInputContext : ControlQueueHeartbeatTaskContext + { + /// + /// Control-queue name. + /// + public string ControlQueueName { get; private set; } + + /// + /// ControlQueueHeartbeatTaskInputContext constructor. + /// + /// Name of control queue. + /// Name of taskhub. + /// Name of partitionCount. + public ControlQueueHeartbeatTaskInputContext(string controlQueueName, string taskhubName, int partitionCount) + : base(taskhubName, partitionCount) + { + this.ControlQueueName = controlQueueName; + } + + /// Returns a string that represents the OrchestrationInstance. + /// A string that represents the current object. + public override string ToString() + { + return $"[ControlQueueName={ControlQueueName}, TaskHubName={TaskHubName}, PartitionCount = {this.PartitionCount}]"; + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs new file mode 100644 index 000000000..7fc5fbcba --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs @@ -0,0 +1,181 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using DurableTask.Core; + +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Control-queue heartbeat orchestrator. + /// This is supposed to be initialized with ControlQueueHeartbeatTaskContext informing orchestrator about configuration of taskhubworker and heartbeat interval. + /// + internal class ControlQueueHeartbeatTaskOrchestratorV1 : TaskOrchestration + { + public const string OrchestrationName = "ControlQueueHeartbeatTaskOrchestrator"; + + public const string OrchestrationVersion = "V1"; + + private ControlQueueHeartbeatTaskContext controlQueueHeartbeatTaskContextInit; + + private TimeSpan controlQueueHearbeatOrchestrationInterval; + + private Func callBack; + + private SemaphoreSlim semaphoreSlim; + + /// + /// ControlQueueHeartbeatTaskOrchestratorV1 constructor. + /// + /// ControlQueueHeartbeatTaskContext object, informs about configuration of taskhubworker orchestrator is running in. + /// Interval between two heartbeats. + /// + /// A callback to allow user process/emit custom metrics for heartbeat execution. + /// + /// Throws if provided ControlQueueHeartbeatTaskContext object is null. + internal ControlQueueHeartbeatTaskOrchestratorV1( + ControlQueueHeartbeatTaskContext controlQueueHeartbeatTaskContext, + TimeSpan controlQueueHearbeatOrchestrationInterval, + Func callBack) + { + this.controlQueueHeartbeatTaskContextInit = controlQueueHeartbeatTaskContext ?? throw new ArgumentNullException(nameof(controlQueueHeartbeatTaskContext)); + this.controlQueueHearbeatOrchestrationInterval = controlQueueHearbeatOrchestrationInterval; + this.callBack = callBack; + + // At worst case, allowing max of 2 heartbeat of a control-queue to run callbacks. + this.semaphoreSlim = new SemaphoreSlim(2 * controlQueueHeartbeatTaskContext.PartitionCount); + } + + public override async Task RunTask(OrchestrationContext context, ControlQueueHeartbeatTaskInputContext controlQueueHeartbeatTaskContextInput) + { + // Stopwatch to calculate time to complete orchestrator. + Stopwatch stopwatchOrch = Stopwatch.StartNew(); + + // Checks for input being null and complete gracefully. + if (controlQueueHeartbeatTaskContextInput == null) + { + // [Logs] Add log for failure of the orchestrator. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorFailed." + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"duration: {stopwatchOrch.ElapsedMilliseconds}" + + $"message: controlQueueHeartbeatTaskContextInput is null. Completing the orchestration."); + + return "Failed"; + } + + var isOrchestratorRunningInCorrectContext = controlQueueHeartbeatTaskContextInput.PartitionCount == controlQueueHeartbeatTaskContextInit.PartitionCount + && controlQueueHeartbeatTaskContextInit.TaskHubName.Equals(controlQueueHeartbeatTaskContextInput.TaskHubName); + + // Checks if the context of orchestrator instance and orchestrator mismatch and complete gracefully. + if (!isOrchestratorRunningInCorrectContext) + { + // [Logs] Add log for mistmatch in context. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorContextMismatch" + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"duration: {stopwatchOrch.ElapsedMilliseconds}" + + $"message: the partition count and taskhub information are not matching."); + + return "Failed"; + } + + // Waiting for heartbeat orchestration interval. + await context.CreateTimer(context.CurrentUtcDateTime.Add(controlQueueHearbeatOrchestrationInterval), true); + + // Ensuring this section doesn't run again. + // This queues the user provided callback without waiting for it to finish. + // This is to keep heartbeat orchestrator thin and fast. + if (!context.IsReplaying) + { + // No wait to complete provided delegate. The current orchestrator need to be very thin and quick to run. + bool isQueued = ThreadPool.QueueUserWorkItem(async (_) => + { + var gotSemaphore = await semaphoreSlim.WaitAsync(controlQueueHearbeatOrchestrationInterval); + Stopwatch stopwatch = Stopwatch.StartNew(); + + try + { + if (gotSemaphore) + { + var cancellationTokenSrc = new CancellationTokenSource(controlQueueHearbeatOrchestrationInterval); + var delayTask = Task.Delay(controlQueueHearbeatOrchestrationInterval, cancellationTokenSrc.Token); + var callBackTask = this.callBack(context.OrchestrationInstance, controlQueueHeartbeatTaskContextInput, controlQueueHeartbeatTaskContextInit, cancellationTokenSrc.Token); + + // Do not allow callBackTask to run forever. + await Task.WhenAll(callBackTask, delayTask); + + if (!callBackTask.IsCompleted) + { + // [Logs] Add log for long running callback. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackTerminated" + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"duration: {stopwatch.ElapsedMilliseconds}" + + $"message: callback is taking too long to cmplete."); + } + } + else + { + // [Logs] Add log for too many callbacks running. Share the semaphore-count for #callBacks allowed, and wait time for semaphore; and ask to reduce the run-time for callback. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorTooManyActiveCallback" + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"duration: {stopwatch.ElapsedMilliseconds}" + + $"message: too many active callbacks, skipping runnning this instance of callback."); + + } + } + // Not throwing anything beyond this. + catch (Exception ex) + { + // [Logs] Add exception details for callback failure. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackFailure" + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"exception: {ex.ToString()}" + + $"duration: {stopwatch.ElapsedMilliseconds}" + + $"message: the partition count and taskhub information are not matching."); + } + // ensuring semaphore is released. + finally + { + if (gotSemaphore) + { + semaphoreSlim.Release(); + } + } + }); + + if (!isQueued) + { + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorV1 Stopping OrchestrationInstance:{context.OrchestrationInstance} controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}"); + + // [Logs] Add log for a heartbeat message from current instance. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued" + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"duration: {stopwatchOrch.ElapsedMilliseconds}" + + $"message: Callback for orchestrator could not be queued."); + } + } + + // [Logs] Add log for a heartbeat message from current instance. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestrator " + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"duration: {stopwatchOrch.ElapsedMilliseconds}" + + $"message: Sending signal for control-queue heartbeat."); + + context.ContinueAsNew(controlQueueHeartbeatTaskContextInput); + + return "Succeeded"; + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/FileWriter.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/FileWriter.cs new file mode 100644 index 000000000..04f35cc78 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/FileWriter.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; + +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// + /// + public class FileWriter + { + static string timeTicks = DateTime.UtcNow.Ticks.ToString(); + static string path = @$"C:\DTF\v2.16.1\Logs_{timeTicks}.txt"; + static string pathControlQueueMonitor = @$"C:\DTF\v2.16.1\Logs_ControlQueueMonitor_{timeTicks}.txt"; + static string pathControlQueueOrch = @$"C:\DTF\v2.16.1\Logs_ControlQueueOrch_{timeTicks}.txt"; + static string pathControlQueueProgram = @$"C:\DTF\v2.16.1\Logs_ControlQueueProgram_{timeTicks}.txt"; + + static object obj = new object(); + + private static void WriteLog(string path, string msg) + { + var modMsg = $"[{DateTime.UtcNow.ToLongTimeString()}] : {msg}" + Environment.NewLine; + Console.WriteLine(modMsg); + + lock (obj) + { + File.AppendAllText(path, modMsg); + } + } + + /// + /// + /// + /// + public static void WriteLogControlQueueMonitor(string msg) + { + WriteLog(pathControlQueueMonitor, msg); + } + + /// + /// + /// + /// + public static void WriteLogControlQueueOrch(string msg) + { + WriteLog(pathControlQueueOrch, msg); + } + + /// + /// + /// + /// + public static void WriteLogControlQueueProgram(string msg) + { + WriteLog(pathControlQueueProgram, msg); + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs new file mode 100644 index 000000000..680171176 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using DurableTask.Core; + +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// Monitors control queue health for orchestrator's processing. Make sure to provide same setting for taskhubclient, taskhubworker and IControlQueueHealthMonitor. + public interface IControlQueueHelper + { +#nullable enable + /// + /// Sets up the TaskHub client and worker for control-queue heartbeat and detects if any of heartbeat orchestration running on each control-queue is not running. + /// + /// TaskHubClient object. + /// TaskHubWorker object. + /// Callback to run with each execution of the orchestrator of type . + /// Callback to run with each time the detects fault of type . + /// Cancellation token. + /// Task result. + Task StartControlQueueHeartbeatMonitorAsync( + TaskHubClient taskHubClient, + TaskHubWorker taskHubWorker, + Func callBackHeartOrchAsync, + Func callBackControlQueueValidation, + CancellationToken cancellationToken); +#nullable disable + + /// + /// Adds orchestrator instances of type for each control queue. + /// + /// TaskHubClient object. + /// If true, creates new instances of orchestrator, otherwise creates only if there is no running instance of orchestrator with same instance id.. + /// Task result. + Task ScheduleControlQueueHeartbeatOrchestrations(TaskHubClient taskHubClient, bool force = false); + + /// + /// Adds orchestrator instance of type to TaskHubWorkerObject. + /// + /// TaskHubWorker object. + /// Callback to run with each execution of the orchestrator of type . + void RegisterControlQueueHeartbeatOrchestration( + TaskHubWorker taskHubWorker, + Func callBackHeartOrchAsync); + + /// + /// Gets instanceId which is targeted for mentioned control-queue names. + /// + /// Array of controlQueueNumbers. + /// InstanceId prefix. + /// InstanceId for control-queue. + string GetControlQueueInstanceId(int[] controlQueueNumbers, string instanceIdPrefix = ""); + } +} From 04560d5580b31c2181961deb43d225660a85d720 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Wed, 3 Apr 2024 17:19:17 -0700 Subject: [PATCH 2/9] Adding changes to avoid failures due to intentional task cancelation. --- .../AzureStorageOrchestrationService.cs | 66 +++++++++++-------- ...ControlQueueHeartbeatTaskOrchestratorV1.cs | 17 ++--- 2 files changed, 47 insertions(+), 36 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 1eeccfa6c..dd69becac 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2090,41 +2090,51 @@ public async Task StartControlQueueHeartbeatMonitorAsync( var localCancellationTokenSource = new CancellationTokenSource(this.settings.ControlQueueOrchHeartbeatDetectionInterval); var linkedCancelationTokenSrc = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, localCancellationTokenSource.Token); - if (!cancellationToken.IsCancellationRequested) + try { - Dictionary controlQueueOwnerIds = new Dictionary(); - - try - { - // Gets control-queue name to owner id dictionary. - controlQueueOwnerIds = await GetControlQueueOwnerIds(); - } - catch (Exception ex) - { - // [Logs] Add exception details for failure at fetching owners of control-queue. - FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + - $"exception: {ex.ToString()}" + - $"message: failed to fetch owner ids for control-queues."); - } - - Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => + if (!cancellationToken.IsCancellationRequested) { - var controlQueueName = controlQueueOrchInstanceId.Key; - var instanceId = controlQueueOrchInstanceId.Value; + Dictionary controlQueueOwnerIds = new Dictionary(); - if ((controlQueueOwnerIds.Count == 0)) + try { - // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. - await RunCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, linkedCancelationTokenSrc.Token); + // Gets control-queue name to owner id dictionary. + controlQueueOwnerIds = await GetControlQueueOwnerIds(); } - else + catch (Exception ex) { - var ownerId = controlQueueOwnerIds[controlQueueName]; - - // Fetch orchestration instance and validate control-queue stuck. - await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, linkedCancelationTokenSrc.Token); + // [Logs] Add exception details for failure at fetching owners of control-queue. + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + + $"exception: {ex.ToString()}" + + $"message: failed to fetch owner ids for control-queues."); } - }); + + Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => + { + var controlQueueName = controlQueueOrchInstanceId.Key; + var instanceId = controlQueueOrchInstanceId.Value; + + if ((controlQueueOwnerIds.Count == 0)) + { + // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. + await RunCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, linkedCancelationTokenSrc.Token); + } + else + { + var ownerId = controlQueueOwnerIds[controlQueueName]; + + // Fetch orchestration instance and validate control-queue stuck. + await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, linkedCancelationTokenSrc.Token); + } + }); + } + } + catch(TaskCanceledException ex) + { + // [Logs] Add exception details for task cancelation. This implies, one iteration took longer than time provided. + FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorTaskCanceled " + + $"exception: {ex.ToString()}" + + $"message: failed to complete full iteration within time provided."); } // Waiting for detection interval. diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs index 7fc5fbcba..b0e1818d0 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs @@ -99,8 +99,8 @@ public override async Task RunTask(OrchestrationContext context, Control { if (gotSemaphore) { - var cancellationTokenSrc = new CancellationTokenSource(controlQueueHearbeatOrchestrationInterval); - var delayTask = Task.Delay(controlQueueHearbeatOrchestrationInterval, cancellationTokenSrc.Token); + var cancellationTokenSrc = new CancellationTokenSource(); + var delayTask = Task.Delay(controlQueueHearbeatOrchestrationInterval); var callBackTask = this.callBack(context.OrchestrationInstance, controlQueueHeartbeatTaskContextInput, controlQueueHeartbeatTaskContextInit, cancellationTokenSrc.Token); // Do not allow callBackTask to run forever. @@ -109,37 +109,38 @@ public override async Task RunTask(OrchestrationContext context, Control if (!callBackTask.IsCompleted) { // [Logs] Add log for long running callback. - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackTerminated" + + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackTerminated " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + $"duration: {stopwatch.ElapsedMilliseconds}" + $"message: callback is taking too long to cmplete."); } + + cancellationTokenSrc.Cancel(); } else { // [Logs] Add log for too many callbacks running. Share the semaphore-count for #callBacks allowed, and wait time for semaphore; and ask to reduce the run-time for callback. - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorTooManyActiveCallback" + + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorTooManyActiveCallback " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + $"duration: {stopwatch.ElapsedMilliseconds}" + $"message: too many active callbacks, skipping runnning this instance of callback."); - } } // Not throwing anything beyond this. catch (Exception ex) { // [Logs] Add exception details for callback failure. - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackFailure" + + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackFailure " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + $"exception: {ex.ToString()}" + $"duration: {stopwatch.ElapsedMilliseconds}" + - $"message: the partition count and taskhub information are not matching."); + $"message: the task was failed."); } // ensuring semaphore is released. finally @@ -156,7 +157,7 @@ public override async Task RunTask(OrchestrationContext context, Control FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorV1 Stopping OrchestrationInstance:{context.OrchestrationInstance} controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}"); // [Logs] Add log for a heartbeat message from current instance. - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued" + + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + From be3818b411833b8686101d30dcf3784014192342 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Wed, 3 Apr 2024 18:14:26 -0700 Subject: [PATCH 3/9] Removing additional cancellation token. --- .../AzureStorageOrchestrationService.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index dd69becac..9257076f4 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2087,8 +2087,7 @@ public async Task StartControlQueueHeartbeatMonitorAsync( while (!cancellationToken.IsCancellationRequested) { - var localCancellationTokenSource = new CancellationTokenSource(this.settings.ControlQueueOrchHeartbeatDetectionInterval); - var linkedCancelationTokenSrc = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, localCancellationTokenSource.Token); + var taskWait = Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval); try { @@ -2117,14 +2116,14 @@ public async Task StartControlQueueHeartbeatMonitorAsync( if ((controlQueueOwnerIds.Count == 0)) { // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. - await RunCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, linkedCancelationTokenSrc.Token); + await RunCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, cancellationToken); } else { var ownerId = controlQueueOwnerIds[controlQueueName]; // Fetch orchestration instance and validate control-queue stuck. - await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, linkedCancelationTokenSrc.Token); + await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, cancellationToken); } }); } @@ -2138,8 +2137,11 @@ public async Task StartControlQueueHeartbeatMonitorAsync( } // Waiting for detection interval. - await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, linkedCancelationTokenSrc.Token); + await taskWait; } + + FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorCancellationRequested " + + $"message: failed to complete full iteration within time provided."); } /// From 74aa625e65564479598f745240ca5d397309cce7 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Thu, 4 Apr 2024 07:00:16 -0700 Subject: [PATCH 4/9] Adding UTs and updating configuration for sample. --- .../ControlQueueHelperTests.cs | 279 ++++++++++++++++++ samples/DurableTask.Samples/Program.cs | 6 +- .../AzureStorageOrchestrationService.cs | 39 ++- .../IControlQueueHelper.cs | 7 +- 4 files changed, 314 insertions(+), 17 deletions(-) create mode 100644 Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs new file mode 100644 index 000000000..95c68ee37 --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs @@ -0,0 +1,279 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureStorage.Tests +{ + using System; + using System.Collections.Generic; + using System.Data; + using System.Data.SqlTypes; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.AzureStorage.ControlQueueHeartbeat; + using DurableTask.Core; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class ControlQueueHelperTests + { + IControlQueueHelper controlQueueHelper; + AzureStorageOrchestrationService azureStorageOrchestrationService; + AzureStorageOrchestrationServiceSettings settings; + int partitionCount = 4; + Dictionary controlQueueNumberToNameMap; + + [TestInitialize] + public void Initialize() + { + settings = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TestHelpers.GetTestTaskHubName(), + PartitionCount = partitionCount, + ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(5), + ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(5), + ControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(5), + }; + + azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); + controlQueueHelper = azureStorageOrchestrationService; + + controlQueueNumberToNameMap = new Dictionary(); + + for (int i = 0; i < partitionCount; i++) + { + var controlQueueName = AzureStorageOrchestrationService.GetControlQueueName(settings.TaskHubName, i); + controlQueueNumberToNameMap[controlQueueName] = i; + } + } + + [TestMethod] + public async Task StartControlQueueHeartbeatMonitorAsync() + { + var detectionCount = new Dictionary(); + + var taskHubClient = new TaskHubClient(azureStorageOrchestrationService); + var taskHubWorker = new Core.TaskHubWorker(azureStorageOrchestrationService); + var controlQueueToInstanceInfo = azureStorageOrchestrationService.GetControlQueueToInstanceIdInfo(); + + // Creating dictionary to calculate stuck issue detection count for each control-queue. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + detectionCount[controlQueueName] = 0; + } + + var cancellationTokenSrc = new CancellationTokenSource(); + + var taskHandle = controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( + taskHubClient, + taskHubWorker, + async (x, y, z, cancelationToken) => { await Task.CompletedTask; }, + async (workerId, ownerId, isOwner, controlQueueName, instanceid, orchDetectionInfo, cancellationToken) => + { + detectionCount[controlQueueName]++; + await Task.CompletedTask; + }, + cancellationTokenSrc.Token); + + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + + // Scheduling of all orchestrator should happen. + foreach (var instanceId in controlQueueToInstanceInfo.Values) + { + var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId); + Assert.IsNotNull(orchIsntance); + } + + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + + // Orchestrator registration completed. + var objectCreator = new NameValueObjectCreator( + ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, + ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, + typeof(ControlQueueHeartbeatTaskOrchestratorV1)); + + Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); + + // Should trigger delegate for control-queue stuck. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + Assert.IsTrue(detectionCount[controlQueueName] > 0); + } + + } + + [TestMethod] + public async Task ScheduleControlQueueHeartbeatOrchestrations() + { + var utcBefore = DateTime.UtcNow; + + var taskHubClient = new TaskHubClient(azureStorageOrchestrationService); + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, true); + + var controlQueueToInstanceInfo = azureStorageOrchestrationService.GetControlQueueToInstanceIdInfo(); + + var utcNow = DateTime.UtcNow; + + foreach (var instanceId in controlQueueToInstanceInfo.Values) + { + var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId); + + Assert.IsTrue(orchIsntance.CreatedTime >= utcBefore && orchIsntance.CreatedTime <= utcNow); + } + + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, false); + + foreach (var instanceId in controlQueueToInstanceInfo.Values) + { + var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId); + + Assert.IsTrue(orchIsntance.CreatedTime >= utcBefore && orchIsntance.CreatedTime <= utcNow); + } + } + + [TestMethod] + public void ScheduleControlQueueHeartbeatOrchestrations_InvalidInput() + { + var settingsMod = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TestHelpers.GetTestTaskHubName(), + PartitionCount = partitionCount + 1, + }; + + var azureStorageOrchestrationServiceMod = new AzureStorageOrchestrationService(settingsMod); + + var taskHubClient = new TaskHubClient(azureStorageOrchestrationServiceMod); + + Assert.ThrowsExceptionAsync(async () => + { + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(null); + }); + + IOrchestrationServiceClient orchestrationService = new Mock().Object; + var taskHubClientDiff = new TaskHubClient(orchestrationService); + + Assert.ThrowsExceptionAsync(async () => + { + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClientDiff); + }); + + Assert.ThrowsExceptionAsync(async () => + { + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient); + }); + } + + [TestMethod] + public void RegisterControlQueueHeartbeatOrchestration() + { + var taskHubWorker = new Core.TaskHubWorker(azureStorageOrchestrationService); + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorker, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + + var objectCreator = new NameValueObjectCreator( + ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, + ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, + typeof(ControlQueueHeartbeatTaskOrchestratorV1)); + + Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); + } + + [TestMethod] + public void RegisterControlQueueHeartbeatOrchestration_InvalidInput() + { + var settingsMod = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TestHelpers.GetTestTaskHubName(), + PartitionCount = partitionCount + 1, + }; + + var azureStorageOrchestrationServiceMod = new AzureStorageOrchestrationService(settingsMod); + + var taskHubWorker = new TaskHubWorker(azureStorageOrchestrationServiceMod); + + Assert.ThrowsException(() => + { + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(null, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + }); + + IOrchestrationService orchestrationService = new Mock().Object; + var taskHubWorkerDiff = new TaskHubWorker(orchestrationService); + + Assert.ThrowsException(() => + { + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorkerDiff, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + }); + + Assert.ThrowsException(() => + { + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorker, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + }); + } + + [TestMethod] + [DataRow(new int[] { 0, 1, 2, 3 })] + [DataRow(new int[] { 2, 3 })] + [DataRow(new int[] { 1, 3 })] + [DataRow(new int[] { 0, 1 })] + [DataRow(new int[] { 0, 2 })] + [DataRow(new int[] { 0, 3 })] + [DataRow(new int[] { 0 })] + [DataRow(new int[] { 1 })] + [DataRow(new int[] { 3 })] + public async Task GetControlQueueInstanceId(int[] controlQueueNumbers) + { + Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); + + var controlQueueNumbersHashSet = new HashSet(); + + foreach (var cQN in controlQueueNumbers) + { + controlQueueNumbersHashSet.Add(cQN); + controlQueueNumberToInstanceIds[cQN] = new List(); + } + + + for (int i = 0; i < 100; i++) + { + var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); + + var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); + var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; + + controlQueueNumberToInstanceIds[controlQueueNumber].Add(instanceId); + + Assert.IsTrue(controlQueueNumbers.Any(x => x == controlQueueNumber)); + } + + foreach (var cQN in controlQueueNumbers) + { + Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0); + } + } + + [TestMethod] + public void GetControlQueueInstanceId_InvalidInput() + { + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(null); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet()); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet() { -4 }); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet() { partitionCount }); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet() { partitionCount + 4 }); }); + } + + } +} diff --git a/samples/DurableTask.Samples/Program.cs b/samples/DurableTask.Samples/Program.cs index 2185bec5b..3ccb110e2 100644 --- a/samples/DurableTask.Samples/Program.cs +++ b/samples/DurableTask.Samples/Program.cs @@ -240,9 +240,9 @@ private static TaskHubWorker TriggerTaskHubWithMonitor(string workerId, string t TaskHubName = taskHubName, UseTablePartitionManagement = true, PartitionCount = 10, - ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(5), - ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(10), - ControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(10), + ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(10), + ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(20), + ControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(30), WorkerId = workerId }; diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9257076f4..299c697d3 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2074,7 +2074,7 @@ public async Task StartControlQueueHeartbeatMonitorAsync( Stopwatch stopwatch = Stopwatch.StartNew(); // Schedule orchestrator instance for each control-queue. - await ScheduleControlQueueHeartbeatOrchestrations(taskHubClient, false); + await ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, false); // Register orchestrator for control-queue heartbeat. RegisterControlQueueHeartbeatOrchestration(taskHubWorker, callBackHeartOrchAsync); @@ -2128,7 +2128,7 @@ public async Task StartControlQueueHeartbeatMonitorAsync( }); } } - catch(TaskCanceledException ex) + catch (TaskCanceledException ex) { // [Logs] Add exception details for task cancelation. This implies, one iteration took longer than time provided. FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorTaskCanceled " + @@ -2145,7 +2145,7 @@ public async Task StartControlQueueHeartbeatMonitorAsync( } /// - public async Task ScheduleControlQueueHeartbeatOrchestrations(TaskHubClient taskHubClient, bool force = false) + public async Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, bool force = false) { // Validate taskhubclient. ValidateTaskHubClient(taskHubClient); @@ -2221,8 +2221,15 @@ public void RegisterControlQueueHeartbeatOrchestration(TaskHubWorker taskHubWork } /// - public string GetControlQueueInstanceId(int[] controlQueueNumbers, string instanceIdPrefix = "") + public string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = "") { + _ = controlQueueNumbers == null ? throw new ArgumentNullException(nameof(controlQueueNumbers)) + : controlQueueNumbers.Count == 0 ? + throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain at least one element.") + : controlQueueNumbers.Any(x => x < 0 || x >= this.settings.PartitionCount) ? + throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain values in range [0, {this.settings.PartitionCount}].") + : controlQueueNumbers; + var instanceId = string.Empty; int suffix = 0; bool foundInstanceId = false; @@ -2256,7 +2263,12 @@ public string GetControlQueueInstanceId(int[] controlQueueNumbers, string instan private void ValidateTaskHubWorker(TaskHubWorker taskHubWorker) { - if (!(taskHubWorker.orchestrationService is AzureStorageOrchestrationService azureStorageOrchestrationServiceTaskHubWorker)) + if(taskHubWorker == null) + { + throw new ArgumentNullException(nameof(taskHubWorker)); + } + + if (!(taskHubWorker.orchestrationService is AzureStorageOrchestrationService azureStorageOrchestrationServiceTaskHubWorker) || azureStorageOrchestrationServiceTaskHubWorker == null) { throw new InvalidOperationException($"TaskhubWorker is not using AzureStorageOrchestrationService."); } @@ -2270,13 +2282,18 @@ private void ValidateTaskHubWorker(TaskHubWorker taskHubWorker) private void ValidateTaskHubClient(TaskHubClient taskHubClient) { - if (!(taskHubClient.ServiceClient is AzureStorageOrchestrationService azureStorageOrchestrationService)) + if (taskHubClient == null) + { + throw new ArgumentNullException(nameof(taskHubClient)); + } + + if (!(taskHubClient.ServiceClient is AzureStorageOrchestrationService azureStorageOrchestrationServiceTaskHubClient)) { throw new InvalidOperationException($"TaskhubClient is not using AzureStorageOrchestrationService."); } - if (!(this.settings.TaskHubName.Equals(azureStorageOrchestrationService.settings.TaskHubName) - && this.settings.PartitionCount == azureStorageOrchestrationService.settings.PartitionCount)) + if (!(this.settings.TaskHubName.Equals(azureStorageOrchestrationServiceTaskHubClient.settings.TaskHubName) + && this.settings.PartitionCount == azureStorageOrchestrationServiceTaskHubClient.settings.PartitionCount)) { throw new InvalidOperationException($"TaskhubClient's AzureStorageOrchestrationService is not having either TaskHubName and/or PartitionCount mismatch."); } @@ -2453,7 +2470,7 @@ private async Task RunCallBack( return controlQueueOwnerIds; } - private Dictionary GetControlQueueToInstanceIdInfo() + internal Dictionary GetControlQueueToInstanceIdInfo() { var partitionCount = this.settings.PartitionCount; var controlQueueOrchInstanceIds = new Dictionary(); @@ -2464,7 +2481,7 @@ private Dictionary GetControlQueueToInstanceIdInfo() var controlQueueName = GetControlQueueName(this.settings.TaskHubName, controlQueueNumber); var instanceIdPrefix = $"DTF_PC_{partitionCount}_CQ_{controlQueueNumber}_"; - string instanceId = GetControlQueueInstanceId(new int[] { controlQueueNumber }, instanceIdPrefix); + string instanceId = GetControlQueueInstanceId(new HashSet { controlQueueNumber }, instanceIdPrefix); controlQueueOrchInstanceIds[controlQueueName] = instanceId; } @@ -2482,7 +2499,7 @@ private Dictionary GetControlQueueToInstanceIdInfo() // TODO: Change this to a sticky assignment so that partition count changes can // be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1 - async Task GetControlQueueAsync(string instanceId) + internal async Task GetControlQueueAsync(string instanceId) { uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; string queueName = GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex); diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs index 680171176..a68dc5868 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using DurableTask.Core; @@ -32,7 +33,7 @@ Task StartControlQueueHeartbeatMonitorAsync( /// TaskHubClient object. /// If true, creates new instances of orchestrator, otherwise creates only if there is no running instance of orchestrator with same instance id.. /// Task result. - Task ScheduleControlQueueHeartbeatOrchestrations(TaskHubClient taskHubClient, bool force = false); + Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, bool force = false); /// /// Adds orchestrator instance of type to TaskHubWorkerObject. @@ -46,9 +47,9 @@ void RegisterControlQueueHeartbeatOrchestration( /// /// Gets instanceId which is targeted for mentioned control-queue names. /// - /// Array of controlQueueNumbers. + /// Collection of controlQueueNumbers. /// InstanceId prefix. /// InstanceId for control-queue. - string GetControlQueueInstanceId(int[] controlQueueNumbers, string instanceIdPrefix = ""); + string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = ""); } } From b6b7b07bfc51da909b4d0c46ab069bd7e405d887 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Thu, 4 Apr 2024 13:12:43 -0700 Subject: [PATCH 5/9] Adding comments for possible structured logging. --- .../AzureStorageOrchestrationService.cs | 75 +++++++++++++++++-- ...ControlQueueHeartbeatTaskOrchestratorV1.cs | 38 ++++++++++ 2 files changed, 107 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 299c697d3..e828917f3 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2097,12 +2097,33 @@ public async Task StartControlQueueHeartbeatMonitorAsync( try { + var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); + var controlQueueOwnerIdsTask = GetControlQueueOwnerIds(); // Gets control-queue name to owner id dictionary. - controlQueueOwnerIds = await GetControlQueueOwnerIds(); + + await Task.WhenAny(delayTask, controlQueueOwnerIdsTask); + + if (!controlQueueOwnerIdsTask.IsCompleted) + { + // [Logs] Add log for long running ControlQueueOwnerIdsFetch. + // Structured logging: ControlQueueOwnerIdsFetchTerminated + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchTerminated" + + $"duration: {stopwatch.ElapsedMilliseconds}" + + $"message: callback is taking too long to cmplete."); + } + else + { + controlQueueOwnerIds = controlQueueOwnerIdsTask.Result; + } } catch (Exception ex) { // [Logs] Add exception details for failure at fetching owners of control-queue. + // Structured logging: ControlQueueOwnerIdsFetchFailed + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + $"exception: {ex.ToString()}" + $"message: failed to fetch owner ids for control-queues."); @@ -2131,6 +2152,9 @@ public async Task StartControlQueueHeartbeatMonitorAsync( catch (TaskCanceledException ex) { // [Logs] Add exception details for task cancelation. This implies, one iteration took longer than time provided. + // Structured logging: StartControlQueueHeartbeatMonitorTaskCanceled + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorTaskCanceled " + $"exception: {ex.ToString()}" + $"message: failed to complete full iteration within time provided."); @@ -2140,6 +2164,9 @@ public async Task StartControlQueueHeartbeatMonitorAsync( await taskWait; } + // Structured logging: StartControlQueueHeartbeatMonitorCancellationRequested + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorCancellationRequested " + $"message: failed to complete full iteration within time provided."); } @@ -2170,6 +2197,10 @@ public async Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient )) { // [Logs] Orchestration instance already exists with state and orchestration id. + // Structured logging: ControlQueueHeartbeatOrchestrationsAlreadyQueued + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> OrchestratorInstance: state.OrchestrationInstance. FileWriter.WriteLogControlQueueMonitor($"ControlQueueHeartbeatOrchestrationsAlreadyQueued" + $"controlQueueName: {controlQueueName}" + $"instanceId: {instanceId}" + @@ -2213,6 +2244,7 @@ public void RegisterControlQueueHeartbeatOrchestration(TaskHubWorker taskHubWork var objectCreator = new NameValueObjectCreator(ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, controlQueueHeartbeatTaskOrchestrator); + // Think about if it should be made idempotent too, taskhubworker is independent of current instance of AzStorageOrchService. // Registering task orchestration. taskHubWorker.AddTaskOrchestrations(objectCreator); } @@ -2263,7 +2295,7 @@ public string GetControlQueueInstanceId(HashSet controlQueueNumbers, string private void ValidateTaskHubWorker(TaskHubWorker taskHubWorker) { - if(taskHubWorker == null) + if (taskHubWorker == null) { throw new ArgumentNullException(nameof(taskHubWorker)); } @@ -2317,8 +2349,12 @@ private async Task ValidateControlQueueOrchestrationAsync( if (!orchInstanceTask.IsCompleted) { - // orchestrator fetch step timed out. - FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceNotFound" + + // orchestrator fetch step timed out. + // Structured logging: OrchestrationInstanceFetchTimedOut + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceFetchTimedOut" + $"controlQueueName: {controlQueueName}" + $"instanceId: {instanceId}" + $"message: orchestration instance couldn't fetch in time."); @@ -2326,10 +2362,15 @@ private async Task ValidateControlQueueOrchestrationAsync( // Run the callback with ownerId and ControlQueueHeartbeatDetectionInfo as OrchestrationInstanceNotFound. await RunCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceNotFound, cancellationToken); } + if (orchInstanceTask.Result == null) { // orchestrator instance not found in control-queue.. - FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceFetchTimedOut" + + // Structured logging: OrchestrationInstanceFetchTimedOut + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceNotFound" + $"controlQueueName: {controlQueueName}" + $"instanceId: {instanceId}" + $"message: orchestration instance not found."); @@ -2349,6 +2390,13 @@ private async Task ValidateControlQueueOrchestrationAsync( if (this.settings.ControlQueueOrchHeartbeatDetectionThreshold < diffInSeconds) { // orchestrator instance not found in control-queue.. + // Structured logging: OrchestrationInstanceStuck + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + // -> controlQueueName: controlQueueName + // -> lastUpdatedTimeUTC: lastUpdatedTimeUTC.ToLongTimeString() + // -> currentTimeUTC: currentTimeUTC.ToLongTimeString() FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceStuck" + $"controlQueueName: {controlQueueName}" + $"instanceId: {instanceId}" + @@ -2390,6 +2438,12 @@ private async Task RunCallBack( if (!callBackTask.IsCompleted) { // [Logs] Add log for long running callback. + // Structured logging: ControlQueueMonitorCallbackTerminated + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + // -> controlQueueName: controlQueueName + // -> duration: stopwatch.ElapsedMilliseconds FileWriter.WriteLogControlQueueMonitor($"ControlQueueMonitorCallbackTerminated" + $"controlQueueName: {controlQueueName}" + $"instanceId: {instanceId}" + @@ -2401,10 +2455,18 @@ private async Task RunCallBack( // Not throwing anything beyond this. catch (Exception ex) { + // [Logs] Add log for callback failure. + // Structured logging: ControlQueueMonitorCallbackFailed + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + // -> controlQueueName: controlQueueName + // -> duration: stopwatch.ElapsedMilliseconds + // -> exception: ex.ToString() FileWriter.WriteLogControlQueueMonitor($"ControlQueueMonitorCallbackFailed " + $"controlQueueName: {controlQueueName}" + $"instanceId: {instanceId}" + - $"Exception: {ex.ToString()} " + + $"Exception: {ex.ToString()} " + $"ElapsedMilliseconds: {stopwatch.ElapsedMilliseconds} "); } // ensuring semaphore is released. @@ -2419,6 +2481,7 @@ private async Task RunCallBack( private async Task> GetControlQueueOwnerIds() { + if (this.settings.UseTablePartitionManagement) { return GetControlQueueOwnerIdsFromTableLeases(); diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs index b0e1818d0..455095279 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs @@ -55,6 +55,12 @@ public override async Task RunTask(OrchestrationContext context, Control if (controlQueueHeartbeatTaskContextInput == null) { // [Logs] Add log for failure of the orchestrator. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorFailed + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: null + // -> duration: stopwatchOrch.ElapsedMilliseconds + // -> message : input context orchestration is null. FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorFailed." + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + @@ -71,6 +77,12 @@ public override async Task RunTask(OrchestrationContext context, Control if (!isOrchestratorRunningInCorrectContext) { // [Logs] Add log for mistmatch in context. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorFailed + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> duration: stopwatchOrch.ElapsedMilliseconds + // -> message : Input and initial context for orchestration . FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorContextMismatch" + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + @@ -109,6 +121,11 @@ public override async Task RunTask(OrchestrationContext context, Control if (!callBackTask.IsCompleted) { // [Logs] Add log for long running callback. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorCallbackTerminated + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> duration: stopwatchOrch.ElapsedMilliseconds FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackTerminated " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + @@ -122,6 +139,11 @@ public override async Task RunTask(OrchestrationContext context, Control else { // [Logs] Add log for too many callbacks running. Share the semaphore-count for #callBacks allowed, and wait time for semaphore; and ask to reduce the run-time for callback. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorTooManyActiveCallback + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> duration: stopwatchOrch.ElapsedMilliseconds FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorTooManyActiveCallback " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + @@ -134,6 +156,12 @@ public override async Task RunTask(OrchestrationContext context, Control catch (Exception ex) { // [Logs] Add exception details for callback failure. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorCallbackFailure + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> duration: stopwatchOrch.ElapsedMilliseconds + // -> exception : ex.ToString() FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackFailure " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + @@ -157,6 +185,11 @@ public override async Task RunTask(OrchestrationContext context, Control FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorV1 Stopping OrchestrationInstance:{context.OrchestrationInstance} controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}"); // [Logs] Add log for a heartbeat message from current instance. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> duration: stopwatchOrch.ElapsedMilliseconds FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + @@ -167,6 +200,11 @@ public override async Task RunTask(OrchestrationContext context, Control } // [Logs] Add log for a heartbeat message from current instance. + // Structured logging: ControlQueueHeartbeatTaskOrchestrator + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> duration: stopwatchOrch.ElapsedMilliseconds FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestrator " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + From 9d8754dcdc6fbbb19d7cbe740afee282c06606cc Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Thu, 4 Apr 2024 14:10:05 -0700 Subject: [PATCH 6/9] minor updates. --- .../ControlQueueHelperTests.cs | 32 ++++++++++++++++--- samples/DurableTask.Samples/Program.cs | 12 +++---- .../AzureStorageOrchestrationService.cs | 7 ++++ 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs index 95c68ee37..d8293c161 100644 --- a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs @@ -77,7 +77,7 @@ public async Task StartControlQueueHeartbeatMonitorAsync() var cancellationTokenSrc = new CancellationTokenSource(); - var taskHandle = controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( + await controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( taskHubClient, taskHubWorker, async (x, y, z, cancelationToken) => { await Task.CompletedTask; }, @@ -88,8 +88,6 @@ public async Task StartControlQueueHeartbeatMonitorAsync() }, cancellationTokenSrc.Token); - await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); - // Scheduling of all orchestrator should happen. foreach (var instanceId in controlQueueToInstanceInfo.Values) { @@ -97,8 +95,6 @@ public async Task StartControlQueueHeartbeatMonitorAsync() Assert.IsNotNull(orchIsntance); } - await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); - // Orchestrator registration completed. var objectCreator = new NameValueObjectCreator( ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, @@ -107,12 +103,38 @@ public async Task StartControlQueueHeartbeatMonitorAsync() Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + + var detectionCountDuplicate = new Dictionary(); + // Should trigger delegate for control-queue stuck. foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) { + detectionCountDuplicate[controlQueueName] = detectionCount[controlQueueName]; Assert.IsTrue(detectionCount[controlQueueName] > 0); } + + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + cancellationTokenSrc.Cancel(); + + // Give it some time to cancel the ongoing operations. + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval); + + // Should trigger delegate for control-queue stuck. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + Assert.IsFalse(detectionCountDuplicate[controlQueueName] == detectionCount[controlQueueName]); + detectionCountDuplicate[controlQueueName] = detectionCount[controlQueueName]; + } + + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + + // Should trigger delegate for control-queue stuck. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + Assert.IsTrue(detectionCountDuplicate[controlQueueName] == detectionCount[controlQueueName]); + } } [TestMethod] diff --git a/samples/DurableTask.Samples/Program.cs b/samples/DurableTask.Samples/Program.cs index 3ccb110e2..20b4484d9 100644 --- a/samples/DurableTask.Samples/Program.cs +++ b/samples/DurableTask.Samples/Program.cs @@ -44,7 +44,7 @@ internal class Program static ObservableEventListener eventListener; [STAThread] - static void Main(string[] args) + static async Task Main(string[] args) { eventListener = new ObservableEventListener(); eventListener.LogToConsole(); @@ -133,9 +133,9 @@ static void Main(string[] args) break; case "ControlQueueHeartbeatMonitor": CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - var taskWorker1 = TriggerTaskHubWithMonitor("workerId1", "taskHub1", cancellationTokenSource.Token); - var taskWorker2 = TriggerTaskHubWithMonitor("workerId2", "taskHub1", cancellationTokenSource.Token); - var taskWorker3 = TriggerTaskHubWithMonitor("WorkerId1", "taskHub1", cancellationTokenSource.Token); + var taskWorker1 = await TriggerTaskHubWithMonitor("workerId1", "taskHub1", cancellationTokenSource.Token); + var taskWorker2 = await TriggerTaskHubWithMonitor("workerId2", "taskHub1", cancellationTokenSource.Token); + var taskWorker3 = await TriggerTaskHubWithMonitor("WorkerId1", "taskHub1", cancellationTokenSource.Token); Task.Delay(TimeSpan.FromMinutes(5)).Wait(); @@ -230,7 +230,7 @@ static void Main(string[] args) } } - private static TaskHubWorker TriggerTaskHubWithMonitor(string workerId, string taskHubName, CancellationToken cancellationToken) + private static async Task TriggerTaskHubWithMonitor(string workerId, string taskHubName, CancellationToken cancellationToken) { string storageConnectionString = GetSetting("StorageConnectionString"); @@ -251,7 +251,7 @@ private static TaskHubWorker TriggerTaskHubWithMonitor(string workerId, string t var taskHubWorker = new TaskHubWorker(orchestrationServiceAndClient); var controlQueueHealthMonitor = (IControlQueueHelper)orchestrationServiceAndClient; - var task = controlQueueHealthMonitor.StartControlQueueHeartbeatMonitorAsync( + await controlQueueHealthMonitor.StartControlQueueHeartbeatMonitorAsync( taskHubClient, taskHubWorker, async (orchestrationInstance, controlQueueHeartbeatTaskInputContext, controlQueueHeartbeatTaskContext, cancellationToken) => diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index e828917f3..0cfb1ca62 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2059,6 +2059,8 @@ public Task DownloadBlobAsync(string blobUri) #region IControlQueueHelper + private Task controlQueueTaskLoop; + /// public async Task StartControlQueueHeartbeatMonitorAsync( TaskHubClient taskHubClient, @@ -2081,6 +2083,11 @@ public async Task StartControlQueueHeartbeatMonitorAsync( // Gets control-queue name to orchestrator instance id dictionary. Dictionary controlQueueOrchInstanceIds = GetControlQueueToInstanceIdInfo(); + controlQueueTaskLoop = StartControlQueueHeartbeatValidationLoop(taskHubClient, callBackControlQueueValidation, stopwatch, controlQueueOrchInstanceIds, cancellationToken); + } + + private async Task StartControlQueueHeartbeatValidationLoop(TaskHubClient taskHubClient, Func callBackControlQueueValidation, Stopwatch stopwatch, Dictionary controlQueueOrchInstanceIds, CancellationToken cancellationToken) + { // Waiting for detection interval initial to give time to worker to allow some draining of messages from control-queue. await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); From bf08248145940bbcf00a7d868ff462baa1f1f0db Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Thu, 4 Apr 2024 17:37:56 -0700 Subject: [PATCH 7/9] Making all callbacks to QueueUserWorkItem and rename of orchestrator. --- .../ControlQueueHelperTests.cs | 35 +-- .../AzureStorageOrchestrationService.cs | 259 +++++++----------- ... ControlQueueHeartbeatTaskOrchestrator.cs} | 88 +----- .../IControlQueueHelper.cs | 11 +- 4 files changed, 137 insertions(+), 256 deletions(-) rename src/DurableTask.AzureStorage/ControlQueueHeartbeat/{ControlQueueHeartbeatTaskOrchestratorV1.cs => ControlQueueHeartbeatTaskOrchestrator.cs} (57%) diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs index d8293c161..ca0c604b1 100644 --- a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs @@ -29,15 +29,18 @@ namespace DurableTask.AzureStorage.Tests [TestClass] public class ControlQueueHelperTests { - IControlQueueHelper controlQueueHelper; - AzureStorageOrchestrationService azureStorageOrchestrationService; - AzureStorageOrchestrationServiceSettings settings; - int partitionCount = 4; - Dictionary controlQueueNumberToNameMap; + private IControlQueueHelper controlQueueHelper; + private AzureStorageOrchestrationService azureStorageOrchestrationService; + private AzureStorageOrchestrationServiceSettings settings; + private int partitionCount = 4; + private Dictionary controlQueueNumberToNameMap; + private CancellationTokenSource cancellationTokenSource; [TestInitialize] public void Initialize() { + cancellationTokenSource = new CancellationTokenSource(); + settings = new AzureStorageOrchestrationServiceSettings() { StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), @@ -97,9 +100,9 @@ await controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( // Orchestrator registration completed. var objectCreator = new NameValueObjectCreator( - ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, - ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, - typeof(ControlQueueHeartbeatTaskOrchestratorV1)); + ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, + ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, + typeof(ControlQueueHeartbeatTaskOrchestrator)); Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); @@ -143,7 +146,7 @@ public async Task ScheduleControlQueueHeartbeatOrchestrations() var utcBefore = DateTime.UtcNow; var taskHubClient = new TaskHubClient(azureStorageOrchestrationService); - await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, true); + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token, true); var controlQueueToInstanceInfo = azureStorageOrchestrationService.GetControlQueueToInstanceIdInfo(); @@ -156,7 +159,7 @@ public async Task ScheduleControlQueueHeartbeatOrchestrations() Assert.IsTrue(orchIsntance.CreatedTime >= utcBefore && orchIsntance.CreatedTime <= utcNow); } - await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, false); + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token, false); foreach (var instanceId in controlQueueToInstanceInfo.Values) { @@ -182,7 +185,7 @@ public void ScheduleControlQueueHeartbeatOrchestrations_InvalidInput() Assert.ThrowsExceptionAsync(async () => { - await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(null); + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(null, cancellationTokenSource.Token); }); IOrchestrationServiceClient orchestrationService = new Mock().Object; @@ -190,12 +193,12 @@ public void ScheduleControlQueueHeartbeatOrchestrations_InvalidInput() Assert.ThrowsExceptionAsync(async () => { - await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClientDiff); + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClientDiff, cancellationTokenSource.Token); }); Assert.ThrowsExceptionAsync(async () => { - await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient); + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token); }); } @@ -206,9 +209,9 @@ public void RegisterControlQueueHeartbeatOrchestration() controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorker, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); var objectCreator = new NameValueObjectCreator( - ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, - ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, - typeof(ControlQueueHeartbeatTaskOrchestratorV1)); + ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, + ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, + typeof(ControlQueueHeartbeatTaskOrchestrator)); Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 0cfb1ca62..e2d2b74ee 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -15,7 +15,6 @@ namespace DurableTask.AzureStorage using System; using System.Collections.Concurrent; using System.Collections.Generic; - using System.Diagnostics; using System.Diagnostics.Tracing; using System.Linq; using System.Net; @@ -186,8 +185,6 @@ public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings this.settings.TaskHubName.ToLowerInvariant() + "-applease", this.settings.TaskHubName.ToLowerInvariant() + "-appleaseinfo", this.settings.AppLeaseOptions); - - semaphoreSlimForControlQueueMonitorCallBack = new SemaphoreSlim(this.settings.PartitionCount); } internal string WorkerId => this.settings.WorkerId; @@ -2059,8 +2056,6 @@ public Task DownloadBlobAsync(string blobUri) #region IControlQueueHelper - private Task controlQueueTaskLoop; - /// public async Task StartControlQueueHeartbeatMonitorAsync( TaskHubClient taskHubClient, @@ -2073,22 +2068,25 @@ public async Task StartControlQueueHeartbeatMonitorAsync( ValidateTaskHubClient(taskHubClient); ValidateTaskHubWorker(taskHubWorker); - Stopwatch stopwatch = Stopwatch.StartNew(); - // Schedule orchestrator instance for each control-queue. - await ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, false); + await ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationToken, false); // Register orchestrator for control-queue heartbeat. RegisterControlQueueHeartbeatOrchestration(taskHubWorker, callBackHeartOrchAsync); // Gets control-queue name to orchestrator instance id dictionary. Dictionary controlQueueOrchInstanceIds = GetControlQueueToInstanceIdInfo(); - controlQueueTaskLoop = StartControlQueueHeartbeatValidationLoop(taskHubClient, callBackControlQueueValidation, stopwatch, controlQueueOrchInstanceIds, cancellationToken); + + // Keeping it alive. + controlQueueTaskLoop = StartControlQueueHeartbeatValidationLoop(taskHubClient, callBackControlQueueValidation, controlQueueOrchInstanceIds, cancellationToken); } - private async Task StartControlQueueHeartbeatValidationLoop(TaskHubClient taskHubClient, Func callBackControlQueueValidation, Stopwatch stopwatch, Dictionary controlQueueOrchInstanceIds, CancellationToken cancellationToken) + private async Task StartControlQueueHeartbeatValidationLoop( + TaskHubClient taskHubClient, + Func callBackControlQueueValidation, + Dictionary controlQueueOrchInstanceIds, + CancellationToken cancellationToken) { - // Waiting for detection interval initial to give time to worker to allow some draining of messages from control-queue. await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); @@ -2096,77 +2094,65 @@ private async Task StartControlQueueHeartbeatValidationLoop(TaskHubClient taskHu { var taskWait = Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval); - try + if (!cancellationToken.IsCancellationRequested) { - if (!cancellationToken.IsCancellationRequested) + Dictionary controlQueueOwnerIds = new Dictionary(); + + try { - Dictionary controlQueueOwnerIds = new Dictionary(); + // Make it time boxed. + var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); - try - { - var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); - var controlQueueOwnerIdsTask = GetControlQueueOwnerIds(); - // Gets control-queue name to owner id dictionary. + // Gets control-queue name to owner id dictionary. + var controlQueueOwnerIdsTask = GetControlQueueOwnerIds(); - await Task.WhenAny(delayTask, controlQueueOwnerIdsTask); + // helps in deciding its time boxed. + await Task.WhenAny(delayTask, controlQueueOwnerIdsTask); - if (!controlQueueOwnerIdsTask.IsCompleted) - { - // [Logs] Add log for long running ControlQueueOwnerIdsFetch. - // Structured logging: ControlQueueOwnerIdsFetchTerminated - // -> TaskHubName: this.settings.TaskHubName - // -> WorkerId: this.settings.WorkerId - FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchTerminated" + - $"duration: {stopwatch.ElapsedMilliseconds}" + - $"message: callback is taking too long to cmplete."); - } - else - { - controlQueueOwnerIds = controlQueueOwnerIdsTask.Result; - } - } - catch (Exception ex) + if (!controlQueueOwnerIdsTask.IsCompleted) { - // [Logs] Add exception details for failure at fetching owners of control-queue. - // Structured logging: ControlQueueOwnerIdsFetchFailed + // [Logs] Add log for long running ControlQueueOwnerIdsFetch. + // Structured logging: ControlQueueOwnerIdsFetchTerminated // -> TaskHubName: this.settings.TaskHubName // -> WorkerId: this.settings.WorkerId - FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + - $"exception: {ex.ToString()}" + - $"message: failed to fetch owner ids for control-queues."); + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchTerminated" + + $"message: callback is taking too long to cmplete."); } - - Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => + else { - var controlQueueName = controlQueueOrchInstanceId.Key; - var instanceId = controlQueueOrchInstanceId.Value; + controlQueueOwnerIds = controlQueueOwnerIdsTask.Result; + } + } + catch (Exception ex) + { + // [Logs] Add exception details for failure at fetching owners of control-queue. + // Structured logging: ControlQueueOwnerIdsFetchFailed + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + + $"exception: {ex.ToString()}" + + $"message: failed to fetch owner ids for control-queues."); + } - if ((controlQueueOwnerIds.Count == 0)) - { - // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. - await RunCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, cancellationToken); - } - else - { - var ownerId = controlQueueOwnerIds[controlQueueName]; + Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => + { + var controlQueueName = controlQueueOrchInstanceId.Key; + var instanceId = controlQueueOrchInstanceId.Value; - // Fetch orchestration instance and validate control-queue stuck. - await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, cancellationToken); - } - }); - } - } - catch (TaskCanceledException ex) - { - // [Logs] Add exception details for task cancelation. This implies, one iteration took longer than time provided. - // Structured logging: StartControlQueueHeartbeatMonitorTaskCanceled - // -> TaskHubName: this.settings.TaskHubName - // -> WorkerId: this.settings.WorkerId - FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorTaskCanceled " + - $"exception: {ex.ToString()}" + - $"message: failed to complete full iteration within time provided."); - } + if ((controlQueueOwnerIds.Count == 0)) + { + // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. + QueueCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, cancellationToken); + } + else + { + var ownerId = controlQueueOwnerIds[controlQueueName]; + // Fetch orchestration instance and validate control-queue stuck. + await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, cancellationToken); + } + }); + } // Waiting for detection interval. await taskWait; } @@ -2179,7 +2165,7 @@ private async Task StartControlQueueHeartbeatValidationLoop(TaskHubClient taskHu } /// - public async Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, bool force = false) + public async Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, CancellationToken cancellationToken, bool force = false) { // Validate taskhubclient. ValidateTaskHubClient(taskHubClient); @@ -2189,6 +2175,11 @@ public async Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient foreach (var controlQueueOrchInstanceId in controlQueueOrchInstanceIds) { + if(cancellationToken.IsCancellationRequested) + { + return; + } + var controlQueueName = controlQueueOrchInstanceId.Key; var instanceId = controlQueueOrchInstanceId.Value; @@ -2222,8 +2213,8 @@ public async Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient // Creating the orchestration instance. await taskHubClient.CreateOrchestrationInstanceAsync( - ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, - ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, + ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, + ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, controlQueueOrchInstanceId.Value, orchInput); } @@ -2234,28 +2225,25 @@ public void RegisterControlQueueHeartbeatOrchestration(TaskHubWorker taskHubWork { ValidateTaskHubWorker(taskHubWorker); - if (!isControlQueueOrchestratorsRegistered) - { - lock (controlQueueOrchestratorsRegistrationLock) - { - if (!isControlQueueOrchestratorsRegistered) - { - // Creating initial context object for orchestration. - // This context will be available for each orchestration ran of this type in the taskhubworker. - ControlQueueHeartbeatTaskOrchestratorV1 controlQueueHeartbeatTaskOrchestrator = new ControlQueueHeartbeatTaskOrchestratorV1( - new ControlQueueHeartbeatTaskContext( - this.settings.TaskHubName, - this.settings.PartitionCount), - this.settings.ControlQueueHearbeatOrchestrationInterval, - callBack); + // Creating initial context object for orchestration. + // This context will be available for each orchestration ran of this type in the taskhubworker. + ControlQueueHeartbeatTaskOrchestrator controlQueueHeartbeatTaskOrchestrator = new ControlQueueHeartbeatTaskOrchestrator( + new ControlQueueHeartbeatTaskContext( + this.settings.TaskHubName, + this.settings.PartitionCount), + this.settings.ControlQueueHearbeatOrchestrationInterval, + callBack); - var objectCreator = new NameValueObjectCreator(ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationName, ControlQueueHeartbeatTaskOrchestratorV1.OrchestrationVersion, controlQueueHeartbeatTaskOrchestrator); + var objectCreator = new NameValueObjectCreator(ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, controlQueueHeartbeatTaskOrchestrator); - // Think about if it should be made idempotent too, taskhubworker is independent of current instance of AzStorageOrchService. - // Registering task orchestration. - taskHubWorker.AddTaskOrchestrations(objectCreator); - } - } + try + { + // Registering task orchestration. + taskHubWorker.AddTaskOrchestrations(objectCreator); + } + catch (InvalidOperationException) + { + // Ignoring it, as orchestration is already registered. This keeps the operation idempotent. } } @@ -2367,7 +2355,7 @@ private async Task ValidateControlQueueOrchestrationAsync( $"message: orchestration instance couldn't fetch in time."); // Run the callback with ownerId and ControlQueueHeartbeatDetectionInfo as OrchestrationInstanceNotFound. - await RunCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceNotFound, cancellationToken); + QueueCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceNotFound, cancellationToken); } if (orchInstanceTask.Result == null) @@ -2383,7 +2371,7 @@ private async Task ValidateControlQueueOrchestrationAsync( $"message: orchestration instance not found."); // Run the callback with ownerId and ControlQueueHeartbeatDetectionInfo as OrchestrationInstanceNotFound. - await RunCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceFetchTimedOut, cancellationToken); + QueueCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceFetchTimedOut, cancellationToken); } else { @@ -2411,12 +2399,12 @@ private async Task ValidateControlQueueOrchestrationAsync( $"currentTimeUTC: {currentTimeUTC.ToLongTimeString()}" + $"message: orchestration instance is stuck."); - await RunCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceStuck, cancellationToken); + QueueCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceStuck, cancellationToken); } } } - private async Task RunCallBack( + private void QueueCallBack( Func callBack, string? ownerId, string controlQueueName, @@ -2424,64 +2412,31 @@ private async Task RunCallBack( ControlQueueHeartbeatDetectionInfo controlQueueHeartbeatDetectionInfo, CancellationToken cancellationToken) { - Stopwatch stopwatch = Stopwatch.StartNew(); - bool gotSemaphore = false; + var isControlQueueOwner = this.settings.WorkerId.Equals(ownerId); - try + if (!cancellationToken.IsCancellationRequested) { - // waiting on semaphore with timeout to avoid thread hogging and starvation. - gotSemaphore = await semaphoreSlimForControlQueueMonitorCallBack.WaitAsync(this.settings.ControlQueueOrchHeartbeatDetectionThreshold); - - var isControlQueueOwner = this.settings.WorkerId.Equals(ownerId); - - if (gotSemaphore) - { - var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); - var callBackTask = callBack(this.settings.WorkerId, ownerId, isControlQueueOwner, controlQueueName, instanceId, controlQueueHeartbeatDetectionInfo, cancellationToken); - - // Do not allow callBackTask to run forever. - await Task.WhenAll(callBackTask, delayTask); - - if (!callBackTask.IsCompleted) + // No wait to complete provided delegate. The current orchestrator need to be very thin and quick to run. + bool isQueued = ThreadPool.QueueUserWorkItem(async (_) => { - // [Logs] Add log for long running callback. - // Structured logging: ControlQueueMonitorCallbackTerminated - // -> TaskHubName: this.settings.TaskHubName - // -> WorkerId: this.settings.WorkerId - // -> instanceId: instanceId - // -> controlQueueName: controlQueueName - // -> duration: stopwatch.ElapsedMilliseconds - FileWriter.WriteLogControlQueueMonitor($"ControlQueueMonitorCallbackTerminated" + - $"controlQueueName: {controlQueueName}" + - $"instanceId: {instanceId}" + - $"duration: {stopwatch.ElapsedMilliseconds}" + - $"message: callback is taking too long to cmplete."); - } - } - } - // Not throwing anything beyond this. - catch (Exception ex) - { - // [Logs] Add log for callback failure. - // Structured logging: ControlQueueMonitorCallbackFailed - // -> TaskHubName: this.settings.TaskHubName - // -> WorkerId: this.settings.WorkerId - // -> instanceId: instanceId - // -> controlQueueName: controlQueueName - // -> duration: stopwatch.ElapsedMilliseconds - // -> exception: ex.ToString() - FileWriter.WriteLogControlQueueMonitor($"ControlQueueMonitorCallbackFailed " + - $"controlQueueName: {controlQueueName}" + - $"instanceId: {instanceId}" + - $"Exception: {ex.ToString()} " + - $"ElapsedMilliseconds: {stopwatch.ElapsedMilliseconds} "); - } - // ensuring semaphore is released. - finally - { - if (gotSemaphore) + await callBack(this.settings.WorkerId, ownerId, isControlQueueOwner, controlQueueName, instanceId, controlQueueHeartbeatDetectionInfo, cancellationToken); + }); + + if (!isQueued) { - semaphoreSlimForControlQueueMonitorCallBack.Release(); + // Structured logging: MonitorControlQueueHeartbeatCallbackNotQueued + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + // -> controlQueueName: controlQueueName + // -> lastUpdatedTimeUTC: lastUpdatedTimeUTC.ToLongTimeString() + // -> currentTimeUTC: currentTimeUTC.ToLongTimeString() + FileWriter.WriteLogControlQueueMonitor($"MonitorControlQueueHeartbeatCallbackNotQueued" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"isControlQueueOwner: {isControlQueueOwner}" + + $"controlQueueHeartbeatDetectionInfo: {controlQueueHeartbeatDetectionInfo}" + + $"message: callback couln't be queued."); } } } @@ -2559,9 +2514,7 @@ internal Dictionary GetControlQueueToInstanceIdInfo() return controlQueueOrchInstanceIds; } - private SemaphoreSlim semaphoreSlimForControlQueueMonitorCallBack; - - private bool isControlQueueOrchestratorsRegistered = false; + private Task controlQueueTaskLoop; private object controlQueueOrchestratorsRegistrationLock = new object(); diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs similarity index 57% rename from src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs rename to src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs index 455095279..0a6dce61e 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestratorV1.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs @@ -10,7 +10,7 @@ namespace DurableTask.AzureStorage.ControlQueueHeartbeat /// Control-queue heartbeat orchestrator. /// This is supposed to be initialized with ControlQueueHeartbeatTaskContext informing orchestrator about configuration of taskhubworker and heartbeat interval. /// - internal class ControlQueueHeartbeatTaskOrchestratorV1 : TaskOrchestration + internal class ControlQueueHeartbeatTaskOrchestrator : TaskOrchestration { public const string OrchestrationName = "ControlQueueHeartbeatTaskOrchestrator"; @@ -22,10 +22,10 @@ internal class ControlQueueHeartbeatTaskOrchestratorV1 : TaskOrchestration callBack; - private SemaphoreSlim semaphoreSlim; + private CancellationTokenSource cancellationTokenSrc; /// - /// ControlQueueHeartbeatTaskOrchestratorV1 constructor. + /// ControlQueueHeartbeatTaskOrchestrator constructor. /// /// ControlQueueHeartbeatTaskContext object, informs about configuration of taskhubworker orchestrator is running in. /// Interval between two heartbeats. @@ -33,7 +33,7 @@ internal class ControlQueueHeartbeatTaskOrchestratorV1 : TaskOrchestration /// Throws if provided ControlQueueHeartbeatTaskContext object is null. - internal ControlQueueHeartbeatTaskOrchestratorV1( + internal ControlQueueHeartbeatTaskOrchestrator( ControlQueueHeartbeatTaskContext controlQueueHeartbeatTaskContext, TimeSpan controlQueueHearbeatOrchestrationInterval, Func callBack) @@ -42,8 +42,7 @@ internal ControlQueueHeartbeatTaskOrchestratorV1( this.controlQueueHearbeatOrchestrationInterval = controlQueueHearbeatOrchestrationInterval; this.callBack = callBack; - // At worst case, allowing max of 2 heartbeat of a control-queue to run callbacks. - this.semaphoreSlim = new SemaphoreSlim(2 * controlQueueHeartbeatTaskContext.PartitionCount); + this.cancellationTokenSrc = new CancellationTokenSource(); } public override async Task RunTask(OrchestrationContext context, ControlQueueHeartbeatTaskInputContext controlQueueHeartbeatTaskContextInput) @@ -104,86 +103,11 @@ public override async Task RunTask(OrchestrationContext context, Control // No wait to complete provided delegate. The current orchestrator need to be very thin and quick to run. bool isQueued = ThreadPool.QueueUserWorkItem(async (_) => { - var gotSemaphore = await semaphoreSlim.WaitAsync(controlQueueHearbeatOrchestrationInterval); - Stopwatch stopwatch = Stopwatch.StartNew(); - - try - { - if (gotSemaphore) - { - var cancellationTokenSrc = new CancellationTokenSource(); - var delayTask = Task.Delay(controlQueueHearbeatOrchestrationInterval); - var callBackTask = this.callBack(context.OrchestrationInstance, controlQueueHeartbeatTaskContextInput, controlQueueHeartbeatTaskContextInit, cancellationTokenSrc.Token); - - // Do not allow callBackTask to run forever. - await Task.WhenAll(callBackTask, delayTask); - - if (!callBackTask.IsCompleted) - { - // [Logs] Add log for long running callback. - // Structured logging: ControlQueueHeartbeatTaskOrchestratorCallbackTerminated - // -> orchestrationInstance: context.OrchestrationInstance.ToString() - // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> duration: stopwatchOrch.ElapsedMilliseconds - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackTerminated " + - $"OrchestrationInstance:{context.OrchestrationInstance} " + - $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + - $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + - $"duration: {stopwatch.ElapsedMilliseconds}" + - $"message: callback is taking too long to cmplete."); - } - - cancellationTokenSrc.Cancel(); - } - else - { - // [Logs] Add log for too many callbacks running. Share the semaphore-count for #callBacks allowed, and wait time for semaphore; and ask to reduce the run-time for callback. - // Structured logging: ControlQueueHeartbeatTaskOrchestratorTooManyActiveCallback - // -> orchestrationInstance: context.OrchestrationInstance.ToString() - // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> duration: stopwatchOrch.ElapsedMilliseconds - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorTooManyActiveCallback " + - $"OrchestrationInstance:{context.OrchestrationInstance} " + - $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + - $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + - $"duration: {stopwatch.ElapsedMilliseconds}" + - $"message: too many active callbacks, skipping runnning this instance of callback."); - } - } - // Not throwing anything beyond this. - catch (Exception ex) - { - // [Logs] Add exception details for callback failure. - // Structured logging: ControlQueueHeartbeatTaskOrchestratorCallbackFailure - // -> orchestrationInstance: context.OrchestrationInstance.ToString() - // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> duration: stopwatchOrch.ElapsedMilliseconds - // -> exception : ex.ToString() - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackFailure " + - $"OrchestrationInstance:{context.OrchestrationInstance} " + - $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + - $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + - $"exception: {ex.ToString()}" + - $"duration: {stopwatch.ElapsedMilliseconds}" + - $"message: the task was failed."); - } - // ensuring semaphore is released. - finally - { - if (gotSemaphore) - { - semaphoreSlim.Release(); - } - } + await this.callBack(context.OrchestrationInstance, controlQueueHeartbeatTaskContextInput, controlQueueHeartbeatTaskContextInit, this.cancellationTokenSrc.Token); }); if (!isQueued) { - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorV1 Stopping OrchestrationInstance:{context.OrchestrationInstance} controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}"); - // [Logs] Add log for a heartbeat message from current instance. // Structured logging: ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued // -> orchestrationInstance: context.OrchestrationInstance.ToString() diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs index a68dc5868..56c9ca651 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs @@ -15,7 +15,7 @@ public interface IControlQueueHelper /// /// TaskHubClient object. /// TaskHubWorker object. - /// Callback to run with each execution of the orchestrator of type . + /// Callback to run with each execution of the orchestrator of type . /// Callback to run with each time the detects fault of type . /// Cancellation token. /// Task result. @@ -28,18 +28,19 @@ Task StartControlQueueHeartbeatMonitorAsync( #nullable disable /// - /// Adds orchestrator instances of type for each control queue. + /// Adds orchestrator instances of type for each control queue. /// /// TaskHubClient object. + /// CancellationToken. /// If true, creates new instances of orchestrator, otherwise creates only if there is no running instance of orchestrator with same instance id.. /// Task result. - Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, bool force = false); + Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, CancellationToken cancellationToken, bool force = false); /// - /// Adds orchestrator instance of type to TaskHubWorkerObject. + /// Adds orchestrator instance of type to TaskHubWorkerObject. /// /// TaskHubWorker object. - /// Callback to run with each execution of the orchestrator of type . + /// Callback to run with each execution of the orchestrator of type . void RegisterControlQueueHeartbeatOrchestration( TaskHubWorker taskHubWorker, Func callBackHeartOrchAsync); From d5698af2c946646c14f2b0a98fa73cfa6774f9a3 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Thu, 4 Apr 2024 19:11:16 -0700 Subject: [PATCH 8/9] Adding ControlQueueHeartbeatTaskResult enum. --- .../ControlQueueHeartbeatTaskOrchestrator.cs | 36 +++---------------- .../ControlQueueHeartbeatTaskResult.cs | 10 ++++++ 2 files changed, 15 insertions(+), 31 deletions(-) create mode 100644 src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs index 0a6dce61e..b01a324e1 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs @@ -1,5 +1,4 @@ using System; -using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using DurableTask.Core; @@ -10,7 +9,7 @@ namespace DurableTask.AzureStorage.ControlQueueHeartbeat /// Control-queue heartbeat orchestrator. /// This is supposed to be initialized with ControlQueueHeartbeatTaskContext informing orchestrator about configuration of taskhubworker and heartbeat interval. /// - internal class ControlQueueHeartbeatTaskOrchestrator : TaskOrchestration + internal class ControlQueueHeartbeatTaskOrchestrator : TaskOrchestration { public const string OrchestrationName = "ControlQueueHeartbeatTaskOrchestrator"; @@ -45,11 +44,8 @@ internal ControlQueueHeartbeatTaskOrchestrator( this.cancellationTokenSrc = new CancellationTokenSource(); } - public override async Task RunTask(OrchestrationContext context, ControlQueueHeartbeatTaskInputContext controlQueueHeartbeatTaskContextInput) + public override async Task RunTask(OrchestrationContext context, ControlQueueHeartbeatTaskInputContext controlQueueHeartbeatTaskContextInput) { - // Stopwatch to calculate time to complete orchestrator. - Stopwatch stopwatchOrch = Stopwatch.StartNew(); - // Checks for input being null and complete gracefully. if (controlQueueHeartbeatTaskContextInput == null) { @@ -58,15 +54,13 @@ public override async Task RunTask(OrchestrationContext context, Control // -> orchestrationInstance: context.OrchestrationInstance.ToString() // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() // -> inputControlQueueHeartbeatTaskContext: null - // -> duration: stopwatchOrch.ElapsedMilliseconds // -> message : input context orchestration is null. FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorFailed." + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + - $"duration: {stopwatchOrch.ElapsedMilliseconds}" + $"message: controlQueueHeartbeatTaskContextInput is null. Completing the orchestration."); - return "Failed"; + return ControlQueueHeartbeatTaskResult.InvalidInput; } var isOrchestratorRunningInCorrectContext = controlQueueHeartbeatTaskContextInput.PartitionCount == controlQueueHeartbeatTaskContextInit.PartitionCount @@ -80,16 +74,14 @@ public override async Task RunTask(OrchestrationContext context, Control // -> orchestrationInstance: context.OrchestrationInstance.ToString() // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> duration: stopwatchOrch.ElapsedMilliseconds // -> message : Input and initial context for orchestration . FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorContextMismatch" + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + - $"duration: {stopwatchOrch.ElapsedMilliseconds}" + $"message: the partition count and taskhub information are not matching."); - return "Failed"; + return ControlQueueHeartbeatTaskResult.InputContextMismatch; } // Waiting for heartbeat orchestration interval. @@ -105,22 +97,6 @@ public override async Task RunTask(OrchestrationContext context, Control { await this.callBack(context.OrchestrationInstance, controlQueueHeartbeatTaskContextInput, controlQueueHeartbeatTaskContextInit, this.cancellationTokenSrc.Token); }); - - if (!isQueued) - { - // [Logs] Add log for a heartbeat message from current instance. - // Structured logging: ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued - // -> orchestrationInstance: context.OrchestrationInstance.ToString() - // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> duration: stopwatchOrch.ElapsedMilliseconds - FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorCallbackNotQueued " + - $"OrchestrationInstance:{context.OrchestrationInstance} " + - $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + - $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + - $"duration: {stopwatchOrch.ElapsedMilliseconds}" + - $"message: Callback for orchestrator could not be queued."); - } } // [Logs] Add log for a heartbeat message from current instance. @@ -128,17 +104,15 @@ public override async Task RunTask(OrchestrationContext context, Control // -> orchestrationInstance: context.OrchestrationInstance.ToString() // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() - // -> duration: stopwatchOrch.ElapsedMilliseconds FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestrator " + $"OrchestrationInstance:{context.OrchestrationInstance} " + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + - $"duration: {stopwatchOrch.ElapsedMilliseconds}" + $"message: Sending signal for control-queue heartbeat."); context.ContinueAsNew(controlQueueHeartbeatTaskContextInput); - return "Succeeded"; + return ControlQueueHeartbeatTaskResult.Succeeded; } } } diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs new file mode 100644 index 000000000..8e77e7bc3 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs @@ -0,0 +1,10 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + internal enum ControlQueueHeartbeatTaskResult + { + Unknown, + Succeeded, + InvalidInput, + InputContextMismatch + } +} From 365c7485ec5d7b236eb84823ad10b25616476983 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Thu, 11 Apr 2024 16:29:35 -0700 Subject: [PATCH 9/9] Addressing PR comments. --- .../ControlQueueHelperTests.cs | 8 +- samples/DurableTask.Samples/Program.cs | 2 +- .../AzureStorageOrchestrationService.cs | 140 ++++++++++-------- ...zureStorageOrchestrationServiceSettings.cs | 8 +- .../ControlQueueHeartbeatTaskOrchestrator.cs | 1 + .../ControlQueueHeartbeatTaskResult.cs | 18 +++ 6 files changed, 108 insertions(+), 69 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs index ca0c604b1..5c81e6f42 100644 --- a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs @@ -48,7 +48,7 @@ public void Initialize() PartitionCount = partitionCount, ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(5), ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(5), - ControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(5), + ControlQueueOrchHeartbeatLatencyThreshold = TimeSpan.FromSeconds(5), }; azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); @@ -106,7 +106,7 @@ await controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); - await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold); var detectionCountDuplicate = new Dictionary(); @@ -118,7 +118,7 @@ await controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( } - await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold); cancellationTokenSrc.Cancel(); // Give it some time to cancel the ongoing operations. @@ -131,7 +131,7 @@ await controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( detectionCountDuplicate[controlQueueName] = detectionCount[controlQueueName]; } - await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatDetectionThreshold); + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold); // Should trigger delegate for control-queue stuck. foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) diff --git a/samples/DurableTask.Samples/Program.cs b/samples/DurableTask.Samples/Program.cs index 20b4484d9..16ceb3cba 100644 --- a/samples/DurableTask.Samples/Program.cs +++ b/samples/DurableTask.Samples/Program.cs @@ -242,7 +242,7 @@ private static async Task TriggerTaskHubWithMonitor(string worker PartitionCount = 10, ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(10), ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(20), - ControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(30), + ControlQueueOrchHeartbeatLatencyThreshold = TimeSpan.FromSeconds(30), WorkerId = workerId }; diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index e2d2b74ee..0b5e83a62 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -246,9 +246,9 @@ static void ValidateSettings(AzureStorageOrchestrationServiceSettings settings) throw new ArgumentNullException(nameof(settings), $"A {nameof(settings.TaskHubName)} value must be configured in the settings."); } - if (settings.ControlQueueHearbeatOrchestrationInterval > settings.ControlQueueOrchHeartbeatDetectionThreshold) + if (settings.ControlQueueHearbeatOrchestrationInterval > settings.ControlQueueOrchHeartbeatLatencyThreshold) { - throw new ArgumentException(nameof(settings), $"{nameof(settings.ControlQueueHearbeatOrchestrationInterval)} must not be more than {nameof(settings.ControlQueueOrchHeartbeatDetectionThreshold)}"); + throw new ArgumentException(nameof(settings), $"{nameof(settings.ControlQueueHearbeatOrchestrationInterval)} must not be more than {nameof(settings.ControlQueueOrchHeartbeatLatencyThreshold)}"); } // TODO: More validation. @@ -2064,6 +2064,12 @@ public async Task StartControlQueueHeartbeatMonitorAsync( Func callBackControlQueueValidation, CancellationToken cancellationToken) { + if(controlQueueTaskStarted) + { + // [Logs] Add log for not starting again. + return; + } + // Validate if taskHubClient and taskHubWorker used is of correct type and settings. ValidateTaskHubClient(taskHubClient); ValidateTaskHubWorker(taskHubWorker); @@ -2077,6 +2083,9 @@ public async Task StartControlQueueHeartbeatMonitorAsync( // Gets control-queue name to orchestrator instance id dictionary. Dictionary controlQueueOrchInstanceIds = GetControlQueueToInstanceIdInfo(); + // started + controlQueueTaskStarted = true; + // Keeping it alive. controlQueueTaskLoop = StartControlQueueHeartbeatValidationLoop(taskHubClient, callBackControlQueueValidation, controlQueueOrchInstanceIds, cancellationToken); } @@ -2087,81 +2096,90 @@ private async Task StartControlQueueHeartbeatValidationLoop( Dictionary controlQueueOrchInstanceIds, CancellationToken cancellationToken) { - // Waiting for detection interval initial to give time to worker to allow some draining of messages from control-queue. - await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); - - while (!cancellationToken.IsCancellationRequested) + try { - var taskWait = Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval); + // Waiting for detection interval initial to give time to worker to allow some draining of messages from control-queue. + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); - if (!cancellationToken.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested && controlQueueTaskStarted) { - Dictionary controlQueueOwnerIds = new Dictionary(); + var taskWait = Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval); - try + if (!cancellationToken.IsCancellationRequested) { - // Make it time boxed. - var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); + Dictionary controlQueueOwnerIds = new Dictionary(); + + try + { + // Make it time boxed. + var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); - // Gets control-queue name to owner id dictionary. - var controlQueueOwnerIdsTask = GetControlQueueOwnerIds(); + // Gets control-queue name to owner id dictionary. + var controlQueueOwnerIdsTask = GetControlQueueOwnerIds(); - // helps in deciding its time boxed. - await Task.WhenAny(delayTask, controlQueueOwnerIdsTask); + // helps in deciding its time boxed. + await Task.WhenAny(delayTask, controlQueueOwnerIdsTask); - if (!controlQueueOwnerIdsTask.IsCompleted) + if (!controlQueueOwnerIdsTask.IsCompleted) + { + // [Logs] Add log for long running ControlQueueOwnerIdsFetch. + // Structured logging: ControlQueueOwnerIdsFetchTerminated + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchTerminated" + + $"message: callback is taking too long to cmplete."); + } + else + { + controlQueueOwnerIds = controlQueueOwnerIdsTask.Result; + } + } + catch (Exception ex) { - // [Logs] Add log for long running ControlQueueOwnerIdsFetch. - // Structured logging: ControlQueueOwnerIdsFetchTerminated + // [Logs] Add exception details for failure at fetching owners of control-queue. + // Structured logging: ControlQueueOwnerIdsFetchFailed // -> TaskHubName: this.settings.TaskHubName // -> WorkerId: this.settings.WorkerId - FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchTerminated" + - $"message: callback is taking too long to cmplete."); + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + + $"exception: {ex.ToString()}" + + $"message: failed to fetch owner ids for control-queues."); } - else - { - controlQueueOwnerIds = controlQueueOwnerIdsTask.Result; - } - } - catch (Exception ex) - { - // [Logs] Add exception details for failure at fetching owners of control-queue. - // Structured logging: ControlQueueOwnerIdsFetchFailed - // -> TaskHubName: this.settings.TaskHubName - // -> WorkerId: this.settings.WorkerId - FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + - $"exception: {ex.ToString()}" + - $"message: failed to fetch owner ids for control-queues."); - } - - Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => - { - var controlQueueName = controlQueueOrchInstanceId.Key; - var instanceId = controlQueueOrchInstanceId.Value; - if ((controlQueueOwnerIds.Count == 0)) - { - // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. - QueueCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, cancellationToken); - } - else + Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => { - var ownerId = controlQueueOwnerIds[controlQueueName]; + var controlQueueName = controlQueueOrchInstanceId.Key; + var instanceId = controlQueueOrchInstanceId.Value; - // Fetch orchestration instance and validate control-queue stuck. - await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, cancellationToken); - } - }); + if ((controlQueueOwnerIds.Count == 0)) + { + // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. + QueueCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, cancellationToken); + } + else + { + var ownerId = controlQueueOwnerIds[controlQueueName]; + + // Fetch orchestration instance and validate control-queue stuck. + await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, cancellationToken); + } + }); + } + // Waiting for detection interval. + await taskWait; } - // Waiting for detection interval. - await taskWait; } + finally + { + controlQueueTaskStarted = false; - // Structured logging: StartControlQueueHeartbeatMonitorCancellationRequested - // -> TaskHubName: this.settings.TaskHubName - // -> WorkerId: this.settings.WorkerId - FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorCancellationRequested " + - $"message: failed to complete full iteration within time provided."); + // Structured logging: StartControlQueueHeartbeatMonitorStopped + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> IsCancellationRequested = cancellationToken.IsCancellationRequested + FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorCancellationRequested " + + $"message: Stopped. " + + $"IsCancellationRequested = {cancellationToken.IsCancellationRequested}"); + } } /// @@ -2382,7 +2400,7 @@ private async Task ValidateControlQueueOrchestrationAsync( var diffInSeconds = currentTimeUTC - lastUpdatedTimeUTC; // If difference in last updated time and current time is greater than threshold, then log the 'OrchestrationInstanceStuck' and run callback. - if (this.settings.ControlQueueOrchHeartbeatDetectionThreshold < diffInSeconds) + if (this.settings.ControlQueueOrchHeartbeatLatencyThreshold < diffInSeconds) { // orchestrator instance not found in control-queue.. // Structured logging: OrchestrationInstanceStuck @@ -2516,6 +2534,8 @@ internal Dictionary GetControlQueueToInstanceIdInfo() private Task controlQueueTaskLoop; + private bool controlQueueTaskStarted = false; + private object controlQueueOrchestratorsRegistrationLock = new object(); #endregion IControlQueueHelper diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 107946837..18a384e3d 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -36,7 +36,7 @@ public class AzureStorageOrchestrationServiceSettings internal static readonly TimeSpan DefaultControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(600); - internal static readonly TimeSpan DefaultControlQueueOrchHeartbeatDetectionThreshold = TimeSpan.FromSeconds(600); + internal static readonly TimeSpan DefaultControlQueueOrchHeartbeatLatencyThreshold = TimeSpan.FromSeconds(600); LogHelper logHelper; @@ -189,14 +189,14 @@ public class AzureStorageOrchestrationServiceSettings public TimeSpan ControlQueueHearbeatOrchestrationInterval { get; set; } = DefaultControlQueueHearbeatOrchestrationInterval; /// - /// Time interval between control queue heartbeat orchestration. + /// Time interval between control queue heartbeat detection. /// public TimeSpan ControlQueueOrchHeartbeatDetectionInterval { get; set; } = DefaultControlQueueOrchHeartbeatDetectionInterval; /// - /// Time interval between control queue heartbeat orchestration. + /// Time threshold for latency in control queue heartbeat. /// - public TimeSpan ControlQueueOrchHeartbeatDetectionThreshold { get; set; } = DefaultControlQueueOrchHeartbeatDetectionThreshold; + public TimeSpan ControlQueueOrchHeartbeatLatencyThreshold { get; set; } = DefaultControlQueueOrchHeartbeatLatencyThreshold; /// /// If true, takes a lease on the task hub container, allowing for only one app to process messages in a task hub at a time. diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs index b01a324e1..f382caebd 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs @@ -93,6 +93,7 @@ public override async Task RunTask(Orchestratio if (!context.IsReplaying) { // No wait to complete provided delegate. The current orchestrator need to be very thin and quick to run. + // This is queued as independently to avoid long running callbacks and safe guard the orchestrator for its frequency and failures occur in callbacks. bool isQueued = ThreadPool.QueueUserWorkItem(async (_) => { await this.callBack(context.OrchestrationInstance, controlQueueHeartbeatTaskContextInput, controlQueueHeartbeatTaskContextInit, this.cancellationTokenSrc.Token); diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs index 8e77e7bc3..828ec5910 100644 --- a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs @@ -2,9 +2,27 @@ { internal enum ControlQueueHeartbeatTaskResult { + /// + /// Default value of ControlQueueHeartbeatTaskResult. + /// Unknown, + + /// + /// Orchestratoin succeeded. + /// Succeeded, + + /// + /// Invalid input. + /// Happens when input is either null or not of type . + /// InvalidInput, + + /// + /// Mismatch in heartbeat orchestration context and input to orchestration instance. + /// This happens when the taskhubworker running with different partition count than that of orchestration instance queued with. + /// Usually it happens when partition-count is changed for a taskhub. + /// InputContextMismatch } }