diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs new file mode 100644 index 000000000..5c81e6f42 --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs @@ -0,0 +1,304 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureStorage.Tests +{ + using System; + using System.Collections.Generic; + using System.Data; + using System.Data.SqlTypes; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.AzureStorage.ControlQueueHeartbeat; + using DurableTask.Core; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class ControlQueueHelperTests + { + private IControlQueueHelper controlQueueHelper; + private AzureStorageOrchestrationService azureStorageOrchestrationService; + private AzureStorageOrchestrationServiceSettings settings; + private int partitionCount = 4; + private Dictionary controlQueueNumberToNameMap; + private CancellationTokenSource cancellationTokenSource; + + [TestInitialize] + public void Initialize() + { + cancellationTokenSource = new CancellationTokenSource(); + + settings = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TestHelpers.GetTestTaskHubName(), + PartitionCount = partitionCount, + ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(5), + ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(5), + ControlQueueOrchHeartbeatLatencyThreshold = TimeSpan.FromSeconds(5), + }; + + azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); + controlQueueHelper = azureStorageOrchestrationService; + + controlQueueNumberToNameMap = new Dictionary(); + + for (int i = 0; i < partitionCount; i++) + { + var controlQueueName = AzureStorageOrchestrationService.GetControlQueueName(settings.TaskHubName, i); + controlQueueNumberToNameMap[controlQueueName] = i; + } + } + + [TestMethod] + public async Task StartControlQueueHeartbeatMonitorAsync() + { + var detectionCount = new Dictionary(); + + var taskHubClient = new TaskHubClient(azureStorageOrchestrationService); + var taskHubWorker = new Core.TaskHubWorker(azureStorageOrchestrationService); + var controlQueueToInstanceInfo = azureStorageOrchestrationService.GetControlQueueToInstanceIdInfo(); + + // Creating dictionary to calculate stuck issue detection count for each control-queue. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + detectionCount[controlQueueName] = 0; + } + + var cancellationTokenSrc = new CancellationTokenSource(); + + await controlQueueHelper.StartControlQueueHeartbeatMonitorAsync( + taskHubClient, + taskHubWorker, + async (x, y, z, cancelationToken) => { await Task.CompletedTask; }, + async (workerId, ownerId, isOwner, controlQueueName, instanceid, orchDetectionInfo, cancellationToken) => + { + detectionCount[controlQueueName]++; + await Task.CompletedTask; + }, + cancellationTokenSrc.Token); + + // Scheduling of all orchestrator should happen. + foreach (var instanceId in controlQueueToInstanceInfo.Values) + { + var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId); + Assert.IsNotNull(orchIsntance); + } + + // Orchestrator registration completed. + var objectCreator = new NameValueObjectCreator( + ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, + ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, + typeof(ControlQueueHeartbeatTaskOrchestrator)); + + Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); + + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold); + + var detectionCountDuplicate = new Dictionary(); + + // Should trigger delegate for control-queue stuck. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + detectionCountDuplicate[controlQueueName] = detectionCount[controlQueueName]; + Assert.IsTrue(detectionCount[controlQueueName] > 0); + } + + + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold); + cancellationTokenSrc.Cancel(); + + // Give it some time to cancel the ongoing operations. + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval); + + // Should trigger delegate for control-queue stuck. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + Assert.IsFalse(detectionCountDuplicate[controlQueueName] == detectionCount[controlQueueName]); + detectionCountDuplicate[controlQueueName] = detectionCount[controlQueueName]; + } + + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold); + + // Should trigger delegate for control-queue stuck. + foreach (var controlQueueName in controlQueueToInstanceInfo.Keys) + { + Assert.IsTrue(detectionCountDuplicate[controlQueueName] == detectionCount[controlQueueName]); + } + } + + [TestMethod] + public async Task ScheduleControlQueueHeartbeatOrchestrations() + { + var utcBefore = DateTime.UtcNow; + + var taskHubClient = new TaskHubClient(azureStorageOrchestrationService); + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token, true); + + var controlQueueToInstanceInfo = azureStorageOrchestrationService.GetControlQueueToInstanceIdInfo(); + + var utcNow = DateTime.UtcNow; + + foreach (var instanceId in controlQueueToInstanceInfo.Values) + { + var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId); + + Assert.IsTrue(orchIsntance.CreatedTime >= utcBefore && orchIsntance.CreatedTime <= utcNow); + } + + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token, false); + + foreach (var instanceId in controlQueueToInstanceInfo.Values) + { + var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId); + + Assert.IsTrue(orchIsntance.CreatedTime >= utcBefore && orchIsntance.CreatedTime <= utcNow); + } + } + + [TestMethod] + public void ScheduleControlQueueHeartbeatOrchestrations_InvalidInput() + { + var settingsMod = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TestHelpers.GetTestTaskHubName(), + PartitionCount = partitionCount + 1, + }; + + var azureStorageOrchestrationServiceMod = new AzureStorageOrchestrationService(settingsMod); + + var taskHubClient = new TaskHubClient(azureStorageOrchestrationServiceMod); + + Assert.ThrowsExceptionAsync(async () => + { + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(null, cancellationTokenSource.Token); + }); + + IOrchestrationServiceClient orchestrationService = new Mock().Object; + var taskHubClientDiff = new TaskHubClient(orchestrationService); + + Assert.ThrowsExceptionAsync(async () => + { + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClientDiff, cancellationTokenSource.Token); + }); + + Assert.ThrowsExceptionAsync(async () => + { + await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token); + }); + } + + [TestMethod] + public void RegisterControlQueueHeartbeatOrchestration() + { + var taskHubWorker = new Core.TaskHubWorker(azureStorageOrchestrationService); + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorker, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + + var objectCreator = new NameValueObjectCreator( + ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, + ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, + typeof(ControlQueueHeartbeatTaskOrchestrator)); + + Assert.ThrowsException(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); }); + } + + [TestMethod] + public void RegisterControlQueueHeartbeatOrchestration_InvalidInput() + { + var settingsMod = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TestHelpers.GetTestTaskHubName(), + PartitionCount = partitionCount + 1, + }; + + var azureStorageOrchestrationServiceMod = new AzureStorageOrchestrationService(settingsMod); + + var taskHubWorker = new TaskHubWorker(azureStorageOrchestrationServiceMod); + + Assert.ThrowsException(() => + { + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(null, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + }); + + IOrchestrationService orchestrationService = new Mock().Object; + var taskHubWorkerDiff = new TaskHubWorker(orchestrationService); + + Assert.ThrowsException(() => + { + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorkerDiff, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + }); + + Assert.ThrowsException(() => + { + controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorker, async (x, y, z, cancellationToken) => { await Task.CompletedTask; }); + }); + } + + [TestMethod] + [DataRow(new int[] { 0, 1, 2, 3 })] + [DataRow(new int[] { 2, 3 })] + [DataRow(new int[] { 1, 3 })] + [DataRow(new int[] { 0, 1 })] + [DataRow(new int[] { 0, 2 })] + [DataRow(new int[] { 0, 3 })] + [DataRow(new int[] { 0 })] + [DataRow(new int[] { 1 })] + [DataRow(new int[] { 3 })] + public async Task GetControlQueueInstanceId(int[] controlQueueNumbers) + { + Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); + + var controlQueueNumbersHashSet = new HashSet(); + + foreach (var cQN in controlQueueNumbers) + { + controlQueueNumbersHashSet.Add(cQN); + controlQueueNumberToInstanceIds[cQN] = new List(); + } + + + for (int i = 0; i < 100; i++) + { + var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); + + var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); + var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; + + controlQueueNumberToInstanceIds[controlQueueNumber].Add(instanceId); + + Assert.IsTrue(controlQueueNumbers.Any(x => x == controlQueueNumber)); + } + + foreach (var cQN in controlQueueNumbers) + { + Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0); + } + } + + [TestMethod] + public void GetControlQueueInstanceId_InvalidInput() + { + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(null); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet()); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet() { -4 }); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet() { partitionCount }); }); + Assert.ThrowsException(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet() { partitionCount + 4 }); }); + } + + } +} diff --git a/samples/DurableTask.Samples/Program.cs b/samples/DurableTask.Samples/Program.cs index a246c760d..16ceb3cba 100644 --- a/samples/DurableTask.Samples/Program.cs +++ b/samples/DurableTask.Samples/Program.cs @@ -22,7 +22,9 @@ namespace DurableTask.Samples using System.IO; using System.Linq; using System.Threading; + using System.Threading.Tasks; using DurableTask.AzureStorage; + using DurableTask.AzureStorage.ControlQueueHeartbeat; using DurableTask.Core; using DurableTask.Core.Tracing; using DurableTask.Samples.AverageCalculator; @@ -42,7 +44,7 @@ internal class Program static ObservableEventListener eventListener; [STAThread] - static void Main(string[] args) + static async Task Main(string[] args) { eventListener = new ObservableEventListener(); eventListener.LogToConsole(); @@ -62,7 +64,7 @@ static void Main(string[] args) var orchestrationServiceAndClient = new AzureStorageOrchestrationService(settings); var taskHubClient = new TaskHubClient(orchestrationServiceAndClient); var taskHubWorker = new TaskHubWorker(orchestrationServiceAndClient); - + if (ArgumentOptions.CreateHub) { orchestrationServiceAndClient.CreateIfNotExistsAsync().Wait(); @@ -85,12 +87,12 @@ static void Main(string[] args) throw new ArgumentException("parameters"); } - instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(GreetingsOrchestration2), instanceId, + instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(GreetingsOrchestration2), instanceId, int.Parse(ArgumentOptions.Parameters[0])).Result; break; case "Cron": // Sample Input: "0 12 * */2 Mon" - instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(CronOrchestration), instanceId, + instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(CronOrchestration), instanceId, (ArgumentOptions.Parameters != null && ArgumentOptions.Parameters.Length > 0) ? ArgumentOptions.Parameters[0] : null).Result; break; case "Average": @@ -108,9 +110,9 @@ static void Main(string[] args) break; case "SumOfSquares": instance = taskHubClient.CreateOrchestrationInstanceAsync( - "SumOfSquaresOrchestration", - "V1", - instanceId, + "SumOfSquaresOrchestration", + "V1", + instanceId, File.ReadAllText("SumofSquares\\BagOfNumbers.json"), new Dictionary(1) { { "Category", "testing" } }).Result; break; @@ -129,6 +131,19 @@ static void Main(string[] args) instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(MigrateOrchestration), instanceId, new MigrateOrchestrationData { SubscriptionId = "03a1cd39-47ac-4a57-9ff5-a2c2a2a76088", IsDisabled = false }).Result; break; + case "ControlQueueHeartbeatMonitor": + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + var taskWorker1 = await TriggerTaskHubWithMonitor("workerId1", "taskHub1", cancellationTokenSource.Token); + var taskWorker2 = await TriggerTaskHubWithMonitor("workerId2", "taskHub1", cancellationTokenSource.Token); + var taskWorker3 = await TriggerTaskHubWithMonitor("WorkerId1", "taskHub1", cancellationTokenSource.Token); + + Task.Delay(TimeSpan.FromMinutes(5)).Wait(); + + cancellationTokenSource.Cancel(); + taskWorker1.StopAsync().Wait(); + taskWorker2.StopAsync().Wait(); + taskWorker3.StopAsync().Wait(); + break; default: throw new Exception("Unsupported Orchestration Name: " + ArgumentOptions.StartInstance); } @@ -139,7 +154,7 @@ static void Main(string[] args) { Console.WriteLine("Run RaiseEvent"); - if (string.IsNullOrWhiteSpace(ArgumentOptions.InstanceId)) + if (string.IsNullOrWhiteSpace(ArgumentOptions.InstanceId)) { throw new ArgumentException("instanceId"); } @@ -163,10 +178,10 @@ static void Main(string[] args) { taskHubWorker.AddTaskOrchestrations( typeof(GreetingsOrchestration), - typeof(GreetingsOrchestration2), + typeof(GreetingsOrchestration2), typeof(CronOrchestration), - typeof(AverageCalculatorOrchestration), - typeof(ErrorHandlingOrchestration), + typeof(AverageCalculatorOrchestration), + typeof(ErrorHandlingOrchestration), typeof(SignalOrchestration), typeof(MigrateOrchestration), typeof(SumOfSquaresOrchestration) @@ -174,14 +189,14 @@ static void Main(string[] args) taskHubWorker.AddTaskOrchestrations( new NameValueObjectCreator("SumOfSquaresOrchestration", "V1", typeof(SumOfSquaresOrchestration))); - + taskHubWorker.AddTaskActivities( - new GetUserTask(), - new SendGreetingTask(), - new CronTask(), - new ComputeSumTask(), - new GoodTask(), - new BadTask(), + new GetUserTask(), + new SendGreetingTask(), + new CronTask(), + new ComputeSumTask(), + new GoodTask(), + new BadTask(), new CleanupTask(), new EmailTask(), new SumOfSquaresTask() @@ -215,6 +230,53 @@ static void Main(string[] args) } } + private static async Task TriggerTaskHubWithMonitor(string workerId, string taskHubName, CancellationToken cancellationToken) + { + string storageConnectionString = GetSetting("StorageConnectionString"); + + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountDetails = new StorageAccountDetails { ConnectionString = storageConnectionString }, + TaskHubName = taskHubName, + UseTablePartitionManagement = true, + PartitionCount = 10, + ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(10), + ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(20), + ControlQueueOrchHeartbeatLatencyThreshold = TimeSpan.FromSeconds(30), + WorkerId = workerId + }; + + var orchestrationServiceAndClient = new AzureStorageOrchestrationService(settings); + var taskHubClient = new TaskHubClient(orchestrationServiceAndClient); + var taskHubWorker = new TaskHubWorker(orchestrationServiceAndClient); + + var controlQueueHealthMonitor = (IControlQueueHelper)orchestrationServiceAndClient; + await controlQueueHealthMonitor.StartControlQueueHeartbeatMonitorAsync( + taskHubClient, + taskHubWorker, + async (orchestrationInstance, controlQueueHeartbeatTaskInputContext, controlQueueHeartbeatTaskContext, cancellationToken) => + { + FileWriter.WriteLogControlQueueProgram($"Heartbeat coming from instanceId {orchestrationInstance.InstanceId} " + + $"running for controlQueueHeartbeatTaskInputContext = [{controlQueueHeartbeatTaskInputContext}] " + + $"with orchestrator running with = [{controlQueueHeartbeatTaskContext}]."); + + if (cancellationToken.IsCancellationRequested) + { + throw new InvalidOperationException($"Dummy exception."); + } + + await Task.CompletedTask; + }, + async (workerId, ownerId, isControlQueueOwner, controlQueueName, instanceId, controlQueueHeartbeatDetectionInfo, cancellationToken) => + { + FileWriter.WriteLogControlQueueProgram($"Act on taskhubworker {workerId} [isControlQueueOwner={isControlQueueOwner}] where control queue {controlQueueName} with controlQueueHeartbeatDetectionInfo {controlQueueHeartbeatDetectionInfo.ToString()} owned by ownerId {ownerId} is either found stuck or too slow, using instanceId {instanceId}."); + await Task.CompletedTask; + }, + cancellationToken); + taskHubWorker.StartAsync().Wait(); + return taskHubWorker; + } + public static string GetSetting(string name) { string value = Environment.GetEnvironmentVariable("DurableTaskTest" + name); diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9f5d15047..0b5e83a62 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -21,6 +21,7 @@ namespace DurableTask.AzureStorage using System.Text; using System.Threading; using System.Threading.Tasks; + using DurableTask.AzureStorage.ControlQueueHeartbeat; using DurableTask.AzureStorage.Messaging; using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; @@ -40,10 +41,11 @@ namespace DurableTask.AzureStorage public sealed class AzureStorageOrchestrationService : IOrchestrationService, IOrchestrationServiceClient, - IDisposable, + IDisposable, IOrchestrationServiceQueryClient, IOrchestrationServicePurgeClient, - IEntityOrchestrationService + IEntityOrchestrationService, + IControlQueueHelper { static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0]; @@ -155,7 +157,7 @@ public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings if (this.settings.UseTablePartitionManagement && this.settings.UseLegacyPartitionManagement) { - throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager."); + throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager."); } else if (this.settings.UseTablePartitionManagement) { @@ -239,11 +241,16 @@ static void ValidateSettings(AzureStorageOrchestrationServiceSettings settings) throw new ArgumentOutOfRangeException(nameof(settings), "The number of partitions must be a positive integer and no greater than 16."); } - if (string.IsNullOrEmpty(settings.TaskHubName)) + if (string.IsNullOrWhiteSpace(settings.TaskHubName)) { throw new ArgumentNullException(nameof(settings), $"A {nameof(settings.TaskHubName)} value must be configured in the settings."); } + if (settings.ControlQueueHearbeatOrchestrationInterval > settings.ControlQueueOrchHeartbeatLatencyThreshold) + { + throw new ArgumentException(nameof(settings), $"{nameof(settings.ControlQueueHearbeatOrchestrationInterval)} must not be more than {nameof(settings.ControlQueueOrchHeartbeatLatencyThreshold)}"); + } + // TODO: More validation. } @@ -630,7 +637,7 @@ internal static async Task GetControlQueuesAsync( // Need to check for leases in Azure Table Storage. Scale Controller calls into this method. int partitionCount; Table partitionTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.PartitionTableName); - + // Check if table partition manager is used. If so, get partition count from table. // Else, get the partition count from the blobs. if (await partitionTable.ExistsAsync()) @@ -872,12 +879,12 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) var messages = session.CurrentMessageBatch; TraceContextBase parentTraceContext = null; bool foundEventRaised = false; - foreach(var message in messages) + foreach (var message in messages) { if (message.SerializableTraceContext != null) { var traceContext = TraceContextBase.Restore(message.SerializableTraceContext); - switch(message.TaskMessage.Event) + switch (message.TaskMessage.Event) { // Dependency Execution finished. case TaskCompletedEvent tc: @@ -901,15 +908,16 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) break; default: // When internal error happens, multiple message could come, however, it should not be prioritized. - if (parentTraceContext == null || + if (parentTraceContext == null || parentTraceContext.OrchestrationTraceContexts.Count < traceContext.OrchestrationTraceContexts.Count) { parentTraceContext = traceContext; } break; - } - } else + } + } + else { // In this case, we set the parentTraceContext later in this method @@ -917,7 +925,7 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) { foundEventRaised = true; } - } + } } // When EventRaisedEvent is present, it will not, out of the box, share the same operation @@ -934,12 +942,12 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) static bool IsActivityOrOrchestrationFailedOrCompleted(IList messages) { - foreach(var message in messages) + foreach (var message in messages) { if (message.TaskMessage.Event is DurableTask.Core.History.SubOrchestrationInstanceCompletedEvent || message.TaskMessage.Event is DurableTask.Core.History.SubOrchestrationInstanceFailedEvent || message.TaskMessage.Event is DurableTask.Core.History.TaskCompletedEvent || - message.TaskMessage.Event is DurableTask.Core.History.TaskFailedEvent || + message.TaskMessage.Event is DurableTask.Core.History.TaskFailedEvent || message.TaskMessage.Event is DurableTask.Core.History.TimerFiredEvent) { return true; @@ -1133,13 +1141,13 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( // Correlation CorrelationTraceClient.Propagate(() => + { + // In case of Extended Session, Emit the Dependency Telemetry. + if (workItem.IsExtendedSession) { - // In case of Extended Session, Emit the Dependency Telemetry. - if (workItem.IsExtendedSession) - { - this.TrackExtendedSessionDependencyTelemetry(session); - } - }); + this.TrackExtendedSessionDependencyTelemetry(session); + } + }); TraceContextBase currentTraceContextBaseOnComplete = null; CorrelationTraceClient.Propagate(() => @@ -1224,7 +1232,7 @@ static bool DependencyTelemetryStarted( TaskMessage continuedAsNewMessage, OrchestrationState orchestrationState) { - return + return (outboundMessages.Count != 0 || orchestratorMessages.Count != 0 || timerMessages.Count != 0) && (orchestrationState.OrchestrationStatus != OrchestrationStatus.Completed) && (orchestrationState.OrchestrationStatus != OrchestrationStatus.Failed); @@ -1266,7 +1274,7 @@ static TraceContextBase CreateOrRestoreRequestTraceContextWithDependencyTracking bool dependencyTelemetryStarted) { TraceContextBase currentTraceContextBaseOnComplete = null; - + if (dependencyTelemetryStarted) { // DependencyTelemetry will be included on an outbound queue @@ -1276,7 +1284,7 @@ static TraceContextBase CreateOrRestoreRequestTraceContextWithDependencyTracking } else { - switch(orchestrationState.OrchestrationStatus) + switch (orchestrationState.OrchestrationStatus) { case OrchestrationStatus.Completed: case OrchestrationStatus.Failed: @@ -1692,7 +1700,7 @@ public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, Orch { throw new InvalidOperationException($"An Orchestration instance with the status {existingInstance.State.OrchestrationStatus} already exists."); } - + return; } @@ -1993,7 +2001,7 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync while (!cancellationToken.IsCancellationRequested) { OrchestrationState? state = await this.GetOrchestrationStateAsync(instanceId, executionId); - + if (state != null && state.OrchestrationStatus != OrchestrationStatus.Running && state.OrchestrationStatus != OrchestrationStatus.Suspended && @@ -2007,9 +2015,9 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync } return state; } - + timeout -= statusPollingInterval; - + // For a user-provided timeout of `TimeSpan.Zero`, // we want to check the status of the orchestration once and then return. // Therefore, we check the timeout condition after the status check. @@ -2046,9 +2054,495 @@ public Task DownloadBlobAsync(string blobUri) #endregion + #region IControlQueueHelper + + /// + public async Task StartControlQueueHeartbeatMonitorAsync( + TaskHubClient taskHubClient, + TaskHubWorker taskHubWorker, + Func callBackHeartOrchAsync, + Func callBackControlQueueValidation, + CancellationToken cancellationToken) + { + if(controlQueueTaskStarted) + { + // [Logs] Add log for not starting again. + return; + } + + // Validate if taskHubClient and taskHubWorker used is of correct type and settings. + ValidateTaskHubClient(taskHubClient); + ValidateTaskHubWorker(taskHubWorker); + + // Schedule orchestrator instance for each control-queue. + await ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationToken, false); + + // Register orchestrator for control-queue heartbeat. + RegisterControlQueueHeartbeatOrchestration(taskHubWorker, callBackHeartOrchAsync); + + // Gets control-queue name to orchestrator instance id dictionary. + Dictionary controlQueueOrchInstanceIds = GetControlQueueToInstanceIdInfo(); + + // started + controlQueueTaskStarted = true; + + // Keeping it alive. + controlQueueTaskLoop = StartControlQueueHeartbeatValidationLoop(taskHubClient, callBackControlQueueValidation, controlQueueOrchInstanceIds, cancellationToken); + } + + private async Task StartControlQueueHeartbeatValidationLoop( + TaskHubClient taskHubClient, + Func callBackControlQueueValidation, + Dictionary controlQueueOrchInstanceIds, + CancellationToken cancellationToken) + { + try + { + // Waiting for detection interval initial to give time to worker to allow some draining of messages from control-queue. + await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); + + while (!cancellationToken.IsCancellationRequested && controlQueueTaskStarted) + { + var taskWait = Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval); + + if (!cancellationToken.IsCancellationRequested) + { + Dictionary controlQueueOwnerIds = new Dictionary(); + + try + { + // Make it time boxed. + var delayTask = Task.Delay(this.settings.ControlQueueHearbeatOrchestrationInterval, cancellationToken); + + // Gets control-queue name to owner id dictionary. + var controlQueueOwnerIdsTask = GetControlQueueOwnerIds(); + + // helps in deciding its time boxed. + await Task.WhenAny(delayTask, controlQueueOwnerIdsTask); + + if (!controlQueueOwnerIdsTask.IsCompleted) + { + // [Logs] Add log for long running ControlQueueOwnerIdsFetch. + // Structured logging: ControlQueueOwnerIdsFetchTerminated + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchTerminated" + + $"message: callback is taking too long to cmplete."); + } + else + { + controlQueueOwnerIds = controlQueueOwnerIdsTask.Result; + } + } + catch (Exception ex) + { + // [Logs] Add exception details for failure at fetching owners of control-queue. + // Structured logging: ControlQueueOwnerIdsFetchFailed + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + FileWriter.WriteLogControlQueueMonitor($"ControlQueueOwnerIdsFetchFailed" + + $"exception: {ex.ToString()}" + + $"message: failed to fetch owner ids for control-queues."); + } + + Parallel.ForEach(controlQueueOrchInstanceIds, async (controlQueueOrchInstanceId) => + { + var controlQueueName = controlQueueOrchInstanceId.Key; + var instanceId = controlQueueOrchInstanceId.Value; + + if ((controlQueueOwnerIds.Count == 0)) + { + // If controlQueueOwnerIds was failed, run the callback with ownerId as null and ControlQueueHeartbeatDetectionInfo as ControlQueueOwnerFetchFailed. + QueueCallBack(callBackControlQueueValidation, null, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.ControlQueueOwnerFetchFailed, cancellationToken); + } + else + { + var ownerId = controlQueueOwnerIds[controlQueueName]; + + // Fetch orchestration instance and validate control-queue stuck. + await ValidateControlQueueOrchestrationAsync(taskHubClient, callBackControlQueueValidation, ownerId, controlQueueName, instanceId, cancellationToken); + } + }); + } + // Waiting for detection interval. + await taskWait; + } + } + finally + { + controlQueueTaskStarted = false; + + // Structured logging: StartControlQueueHeartbeatMonitorStopped + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> IsCancellationRequested = cancellationToken.IsCancellationRequested + FileWriter.WriteLogControlQueueMonitor($"StartControlQueueHeartbeatMonitorCancellationRequested " + + $"message: Stopped. " + + $"IsCancellationRequested = {cancellationToken.IsCancellationRequested}"); + } + } + + /// + public async Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, CancellationToken cancellationToken, bool force = false) + { + // Validate taskhubclient. + ValidateTaskHubClient(taskHubClient); + + // Get control-queue to instance-id map. + Dictionary controlQueueOrchInstanceIds = GetControlQueueToInstanceIdInfo(); + + foreach (var controlQueueOrchInstanceId in controlQueueOrchInstanceIds) + { + if(cancellationToken.IsCancellationRequested) + { + return; + } + + var controlQueueName = controlQueueOrchInstanceId.Key; + var instanceId = controlQueueOrchInstanceId.Value; + + if (!force) + { + var state = await taskHubClient.GetOrchestrationStateAsync(instanceId); + + // if the orchestration instance is not completed, then don't add orchestration instance again. + if (state != null && ( + state.OrchestrationStatus == OrchestrationStatus.Pending + || state.OrchestrationStatus == OrchestrationStatus.Running + || state.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew + )) + { + // [Logs] Orchestration instance already exists with state and orchestration id. + // Structured logging: ControlQueueHeartbeatOrchestrationsAlreadyQueued + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> OrchestratorInstance: state.OrchestrationInstance. + FileWriter.WriteLogControlQueueMonitor($"ControlQueueHeartbeatOrchestrationsAlreadyQueued" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"message: orchestration already present in incomplete state."); + + continue; + } + } + + // Creating input context for control queue with control-queue name, taskhub name, and partition count. + var orchInput = new ControlQueueHeartbeatTaskInputContext(controlQueueName, this.settings.TaskHubName, this.settings.PartitionCount); + + // Creating the orchestration instance. + await taskHubClient.CreateOrchestrationInstanceAsync( + ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, + ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, + controlQueueOrchInstanceId.Value, + orchInput); + } + } + + /// + public void RegisterControlQueueHeartbeatOrchestration(TaskHubWorker taskHubWorker, Func callBack) + { + ValidateTaskHubWorker(taskHubWorker); + + // Creating initial context object for orchestration. + // This context will be available for each orchestration ran of this type in the taskhubworker. + ControlQueueHeartbeatTaskOrchestrator controlQueueHeartbeatTaskOrchestrator = new ControlQueueHeartbeatTaskOrchestrator( + new ControlQueueHeartbeatTaskContext( + this.settings.TaskHubName, + this.settings.PartitionCount), + this.settings.ControlQueueHearbeatOrchestrationInterval, + callBack); + + var objectCreator = new NameValueObjectCreator(ControlQueueHeartbeatTaskOrchestrator.OrchestrationName, ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion, controlQueueHeartbeatTaskOrchestrator); + + try + { + // Registering task orchestration. + taskHubWorker.AddTaskOrchestrations(objectCreator); + } + catch (InvalidOperationException) + { + // Ignoring it, as orchestration is already registered. This keeps the operation idempotent. + } + } + + /// + public string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = "") + { + _ = controlQueueNumbers == null ? throw new ArgumentNullException(nameof(controlQueueNumbers)) + : controlQueueNumbers.Count == 0 ? + throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain at least one element.") + : controlQueueNumbers.Any(x => x < 0 || x >= this.settings.PartitionCount) ? + throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain values in range [0, {this.settings.PartitionCount}].") + : controlQueueNumbers; + + var instanceId = string.Empty; + int suffix = 0; + bool foundInstanceId = false; + + var partitionCount = this.settings.PartitionCount; + + // Validating control-queue numbers are valid. + foreach (var controlQueueNumber in controlQueueNumbers) + { + if (controlQueueNumber < 0 && controlQueueNumber >= partitionCount) + { + throw new InvalidOperationException($"{nameof(controlQueueNumbers)} has at least one value which is not in range of [0,{partitionCount - 1}] with partition count = {partitionCount}."); + } + } + + // Updating suffix and checking control-queue being from provided list until found one. + while (!foundInstanceId) + { + suffix++; + instanceId = $"{instanceIdPrefix}{suffix}"; + var controlQueueNumber = (int)Fnv1aHashHelper.ComputeHash(instanceId) % partitionCount; + + if (controlQueueNumbers.Any(x => x == controlQueueNumber)) + { + foundInstanceId = true; + } + } + + return instanceId; + } + + private void ValidateTaskHubWorker(TaskHubWorker taskHubWorker) + { + if (taskHubWorker == null) + { + throw new ArgumentNullException(nameof(taskHubWorker)); + } + + if (!(taskHubWorker.orchestrationService is AzureStorageOrchestrationService azureStorageOrchestrationServiceTaskHubWorker) || azureStorageOrchestrationServiceTaskHubWorker == null) + { + throw new InvalidOperationException($"TaskhubWorker is not using AzureStorageOrchestrationService."); + } + + if (!(this.settings.TaskHubName.Equals(azureStorageOrchestrationServiceTaskHubWorker.settings.TaskHubName) + && this.settings.PartitionCount == azureStorageOrchestrationServiceTaskHubWorker.settings.PartitionCount)) + { + throw new InvalidOperationException($"TaskhubWorker's AzureStorageOrchestrationService is not having either TaskHubName and/or PartitionCount mismatch."); + } + } + + private void ValidateTaskHubClient(TaskHubClient taskHubClient) + { + if (taskHubClient == null) + { + throw new ArgumentNullException(nameof(taskHubClient)); + } + + if (!(taskHubClient.ServiceClient is AzureStorageOrchestrationService azureStorageOrchestrationServiceTaskHubClient)) + { + throw new InvalidOperationException($"TaskhubClient is not using AzureStorageOrchestrationService."); + } + + if (!(this.settings.TaskHubName.Equals(azureStorageOrchestrationServiceTaskHubClient.settings.TaskHubName) + && this.settings.PartitionCount == azureStorageOrchestrationServiceTaskHubClient.settings.PartitionCount)) + { + throw new InvalidOperationException($"TaskhubClient's AzureStorageOrchestrationService is not having either TaskHubName and/or PartitionCount mismatch."); + } + } + + private async Task ValidateControlQueueOrchestrationAsync( + TaskHubClient taskHubClient, + Func callBack, + string? ownerId, + string controlQueueName, + string instanceId, + CancellationToken cancellationToken) + { + DateTime currentTimeUTC = DateTime.UtcNow; + + // Make it time boxed. + var timeBoxedActivity = Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval, cancellationToken); + var orchInstanceTask = taskHubClient.GetOrchestrationStateAsync(instanceId); + + await Task.WhenAny(timeBoxedActivity, orchInstanceTask); + + if (!orchInstanceTask.IsCompleted) + { + // orchestrator fetch step timed out. + // Structured logging: OrchestrationInstanceFetchTimedOut + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceFetchTimedOut" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"message: orchestration instance couldn't fetch in time."); + + // Run the callback with ownerId and ControlQueueHeartbeatDetectionInfo as OrchestrationInstanceNotFound. + QueueCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceNotFound, cancellationToken); + } + + if (orchInstanceTask.Result == null) + { + // orchestrator instance not found in control-queue.. + // Structured logging: OrchestrationInstanceFetchTimedOut + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceNotFound" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"message: orchestration instance not found."); + + // Run the callback with ownerId and ControlQueueHeartbeatDetectionInfo as OrchestrationInstanceNotFound. + QueueCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceFetchTimedOut, cancellationToken); + } + else + { + var orchInstance = orchInstanceTask.Result; + + var lastUpdatedTimeUTC = orchInstance.LastUpdatedTime; + + var diffInSeconds = currentTimeUTC - lastUpdatedTimeUTC; + + // If difference in last updated time and current time is greater than threshold, then log the 'OrchestrationInstanceStuck' and run callback. + if (this.settings.ControlQueueOrchHeartbeatLatencyThreshold < diffInSeconds) + { + // orchestrator instance not found in control-queue.. + // Structured logging: OrchestrationInstanceStuck + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + // -> controlQueueName: controlQueueName + // -> lastUpdatedTimeUTC: lastUpdatedTimeUTC.ToLongTimeString() + // -> currentTimeUTC: currentTimeUTC.ToLongTimeString() + FileWriter.WriteLogControlQueueMonitor($"OrchestrationInstanceStuck" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"lastUpdatedTimeUTC: {lastUpdatedTimeUTC.ToLongTimeString()}" + + $"currentTimeUTC: {currentTimeUTC.ToLongTimeString()}" + + $"message: orchestration instance is stuck."); + + QueueCallBack(callBack, ownerId, controlQueueName, instanceId, ControlQueueHeartbeatDetectionInfo.OrchestrationInstanceStuck, cancellationToken); + } + } + } + + private void QueueCallBack( + Func callBack, + string? ownerId, + string controlQueueName, + string instanceId, + ControlQueueHeartbeatDetectionInfo controlQueueHeartbeatDetectionInfo, + CancellationToken cancellationToken) + { + var isControlQueueOwner = this.settings.WorkerId.Equals(ownerId); + + if (!cancellationToken.IsCancellationRequested) + { + // No wait to complete provided delegate. The current orchestrator need to be very thin and quick to run. + bool isQueued = ThreadPool.QueueUserWorkItem(async (_) => + { + await callBack(this.settings.WorkerId, ownerId, isControlQueueOwner, controlQueueName, instanceId, controlQueueHeartbeatDetectionInfo, cancellationToken); + }); + + if (!isQueued) + { + // Structured logging: MonitorControlQueueHeartbeatCallbackNotQueued + // -> TaskHubName: this.settings.TaskHubName + // -> WorkerId: this.settings.WorkerId + // -> instanceId: instanceId + // -> controlQueueName: controlQueueName + // -> lastUpdatedTimeUTC: lastUpdatedTimeUTC.ToLongTimeString() + // -> currentTimeUTC: currentTimeUTC.ToLongTimeString() + FileWriter.WriteLogControlQueueMonitor($"MonitorControlQueueHeartbeatCallbackNotQueued" + + $"controlQueueName: {controlQueueName}" + + $"instanceId: {instanceId}" + + $"isControlQueueOwner: {isControlQueueOwner}" + + $"controlQueueHeartbeatDetectionInfo: {controlQueueHeartbeatDetectionInfo}" + + $"message: callback couln't be queued."); + } + } + } + + private async Task> GetControlQueueOwnerIds() + { + + if (this.settings.UseTablePartitionManagement) + { + return GetControlQueueOwnerIdsFromTableLeases(); + } + else + { + return await GetControlQueueOwnerIdsFromBlobLeasesAsync(); + } + } + + private Dictionary GetControlQueueOwnerIdsFromTableLeases() + { + // Get table leases. + IEnumerable ownershipLeases = this.ListTableLeases(); + + string ownershipInfo = string.Empty; + + Dictionary controlQueueOwnerIds = new Dictionary(); + + // Tranform the table lease to control-queue name to owner-id map. + foreach (var ownershipLease in ownershipLeases) + { + var controlQueueName = ownershipLease.RowKey ?? string.Empty; + controlQueueOwnerIds[controlQueueName] = ownershipLease.CurrentOwner; + ownershipInfo += $"[PartitionId={ownershipLease.RowKey}, Owner={ownershipLease.CurrentOwner}, NextOwner={ownershipLease.NextOwner}, OwnedSince={ownershipLease.OwnedSince}]"; + } + + FileWriter.WriteLogControlQueueMonitor($"OwnershipInfo for all control queues: {ownershipInfo}"); + return controlQueueOwnerIds; + } + + private async Task> GetControlQueueOwnerIdsFromBlobLeasesAsync() + { + // Get blob leases. + IEnumerable ownershipLeases = await this.ListBlobLeasesAsync(); + + string ownershipInfo = string.Empty; + + Dictionary controlQueueOwnerIds = new Dictionary(); + + // Tranform the blob lease to control-queue name to owner-id map. + foreach (var ownershipLease in ownershipLeases) + { + controlQueueOwnerIds[ownershipLease.PartitionId] = ownershipLease.Owner; + ownershipInfo += $"[PartitionId={ownershipLease.PartitionId}, Owner={ownershipLease.Owner}, Token={ownershipLease.Token}, Epoch={ownershipLease.Epoch}]"; + } + + FileWriter.WriteLogControlQueueMonitor($"OwnershipInfo for all control queues: {ownershipInfo}"); + return controlQueueOwnerIds; + } + + internal Dictionary GetControlQueueToInstanceIdInfo() + { + var partitionCount = this.settings.PartitionCount; + var controlQueueOrchInstanceIds = new Dictionary(); + + // Generate control-queue name to instance id map. + for (int controlQueueNumber = 0; controlQueueNumber < partitionCount; controlQueueNumber++) + { + var controlQueueName = GetControlQueueName(this.settings.TaskHubName, controlQueueNumber); + var instanceIdPrefix = $"DTF_PC_{partitionCount}_CQ_{controlQueueNumber}_"; + + string instanceId = GetControlQueueInstanceId(new HashSet { controlQueueNumber }, instanceIdPrefix); + + controlQueueOrchInstanceIds[controlQueueName] = instanceId; + } + + return controlQueueOrchInstanceIds; + } + + private Task controlQueueTaskLoop; + + private bool controlQueueTaskStarted = false; + + private object controlQueueOrchestratorsRegistrationLock = new object(); + + #endregion IControlQueueHelper + // TODO: Change this to a sticky assignment so that partition count changes can // be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1 - async Task GetControlQueueAsync(string instanceId) + internal async Task GetControlQueueAsync(string instanceId) { uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; string queueName = GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex); diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 609a8b35d..18a384e3d 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -32,6 +32,12 @@ public class AzureStorageOrchestrationServiceSettings internal static readonly TimeSpan DefaultMaxQueuePollingInterval = TimeSpan.FromSeconds(30); + internal static readonly TimeSpan DefaultControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(60); + + internal static readonly TimeSpan DefaultControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(600); + + internal static readonly TimeSpan DefaultControlQueueOrchHeartbeatLatencyThreshold = TimeSpan.FromSeconds(600); + LogHelper logHelper; /// @@ -177,6 +183,21 @@ public class AzureStorageOrchestrationServiceSettings /// public TimeSpan MaxQueuePollingInterval { get; set; } = DefaultMaxQueuePollingInterval; + /// + /// Time interval between control queue heartbeat orchestration. + /// + public TimeSpan ControlQueueHearbeatOrchestrationInterval { get; set; } = DefaultControlQueueHearbeatOrchestrationInterval; + + /// + /// Time interval between control queue heartbeat detection. + /// + public TimeSpan ControlQueueOrchHeartbeatDetectionInterval { get; set; } = DefaultControlQueueOrchHeartbeatDetectionInterval; + + /// + /// Time threshold for latency in control queue heartbeat. + /// + public TimeSpan ControlQueueOrchHeartbeatLatencyThreshold { get; set; } = DefaultControlQueueOrchHeartbeatLatencyThreshold; + /// /// If true, takes a lease on the task hub container, allowing for only one app to process messages in a task hub at a time. /// diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatDetectionInfo.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatDetectionInfo.cs new file mode 100644 index 000000000..257e34ad2 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatDetectionInfo.cs @@ -0,0 +1,33 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Control-queue heartbeat detection information. + /// + public enum ControlQueueHeartbeatDetectionInfo + { + /// + /// Default value. + /// + Unknown, + + /// + /// The orchestration instance for control-queue is stuck. + /// + OrchestrationInstanceStuck, + + /// + /// Expected orchestration instance for control-queue timed out. + /// + OrchestrationInstanceFetchTimedOut, + + /// + /// Expected orchestration instance for control-queue not found. + /// + OrchestrationInstanceNotFound, + + /// + /// Control-queue owner information fetch failed. + /// + ControlQueueOwnerFetchFailed + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskContext.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskContext.cs new file mode 100644 index 000000000..135f2c6c0 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskContext.cs @@ -0,0 +1,38 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Context for ControlQueueHeartbeat task orchestrator. + /// + public class ControlQueueHeartbeatTaskContext + { + /// + /// Name of taskhub. + /// + public string TaskHubName { get; private set; } + + /// + /// Number of partitions. + /// + public int PartitionCount { get; private set; } + + /// + /// ControlQueueHeartbeatTaskContext constructor. + /// + /// Name of taskhub. + /// Number of partitions. + public ControlQueueHeartbeatTaskContext(string taskhubName, int partitionCount) + { + this.TaskHubName = taskhubName; + this.PartitionCount = partitionCount; + } + + /// + /// Returns a string that represents the OrchestrationInstance. + /// + /// A string that represents the current object. + public override string ToString() + { + return $"[TaskHubName={TaskHubName}, PartitionCount = {this.PartitionCount}]"; + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskInputContext.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskInputContext.cs new file mode 100644 index 000000000..0494d489d --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskInputContext.cs @@ -0,0 +1,32 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Input for ControlQueueHeartbeat orchestration instance. + /// + public class ControlQueueHeartbeatTaskInputContext : ControlQueueHeartbeatTaskContext + { + /// + /// Control-queue name. + /// + public string ControlQueueName { get; private set; } + + /// + /// ControlQueueHeartbeatTaskInputContext constructor. + /// + /// Name of control queue. + /// Name of taskhub. + /// Name of partitionCount. + public ControlQueueHeartbeatTaskInputContext(string controlQueueName, string taskhubName, int partitionCount) + : base(taskhubName, partitionCount) + { + this.ControlQueueName = controlQueueName; + } + + /// Returns a string that represents the OrchestrationInstance. + /// A string that represents the current object. + public override string ToString() + { + return $"[ControlQueueName={ControlQueueName}, TaskHubName={TaskHubName}, PartitionCount = {this.PartitionCount}]"; + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs new file mode 100644 index 000000000..f382caebd --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskOrchestrator.cs @@ -0,0 +1,119 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using DurableTask.Core; + +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// Control-queue heartbeat orchestrator. + /// This is supposed to be initialized with ControlQueueHeartbeatTaskContext informing orchestrator about configuration of taskhubworker and heartbeat interval. + /// + internal class ControlQueueHeartbeatTaskOrchestrator : TaskOrchestration + { + public const string OrchestrationName = "ControlQueueHeartbeatTaskOrchestrator"; + + public const string OrchestrationVersion = "V1"; + + private ControlQueueHeartbeatTaskContext controlQueueHeartbeatTaskContextInit; + + private TimeSpan controlQueueHearbeatOrchestrationInterval; + + private Func callBack; + + private CancellationTokenSource cancellationTokenSrc; + + /// + /// ControlQueueHeartbeatTaskOrchestrator constructor. + /// + /// ControlQueueHeartbeatTaskContext object, informs about configuration of taskhubworker orchestrator is running in. + /// Interval between two heartbeats. + /// + /// A callback to allow user process/emit custom metrics for heartbeat execution. + /// + /// Throws if provided ControlQueueHeartbeatTaskContext object is null. + internal ControlQueueHeartbeatTaskOrchestrator( + ControlQueueHeartbeatTaskContext controlQueueHeartbeatTaskContext, + TimeSpan controlQueueHearbeatOrchestrationInterval, + Func callBack) + { + this.controlQueueHeartbeatTaskContextInit = controlQueueHeartbeatTaskContext ?? throw new ArgumentNullException(nameof(controlQueueHeartbeatTaskContext)); + this.controlQueueHearbeatOrchestrationInterval = controlQueueHearbeatOrchestrationInterval; + this.callBack = callBack; + + this.cancellationTokenSrc = new CancellationTokenSource(); + } + + public override async Task RunTask(OrchestrationContext context, ControlQueueHeartbeatTaskInputContext controlQueueHeartbeatTaskContextInput) + { + // Checks for input being null and complete gracefully. + if (controlQueueHeartbeatTaskContextInput == null) + { + // [Logs] Add log for failure of the orchestrator. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorFailed + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: null + // -> message : input context orchestration is null. + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorFailed." + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"message: controlQueueHeartbeatTaskContextInput is null. Completing the orchestration."); + + return ControlQueueHeartbeatTaskResult.InvalidInput; + } + + var isOrchestratorRunningInCorrectContext = controlQueueHeartbeatTaskContextInput.PartitionCount == controlQueueHeartbeatTaskContextInit.PartitionCount + && controlQueueHeartbeatTaskContextInit.TaskHubName.Equals(controlQueueHeartbeatTaskContextInput.TaskHubName); + + // Checks if the context of orchestrator instance and orchestrator mismatch and complete gracefully. + if (!isOrchestratorRunningInCorrectContext) + { + // [Logs] Add log for mistmatch in context. + // Structured logging: ControlQueueHeartbeatTaskOrchestratorFailed + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> message : Input and initial context for orchestration . + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestratorContextMismatch" + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"message: the partition count and taskhub information are not matching."); + + return ControlQueueHeartbeatTaskResult.InputContextMismatch; + } + + // Waiting for heartbeat orchestration interval. + await context.CreateTimer(context.CurrentUtcDateTime.Add(controlQueueHearbeatOrchestrationInterval), true); + + // Ensuring this section doesn't run again. + // This queues the user provided callback without waiting for it to finish. + // This is to keep heartbeat orchestrator thin and fast. + if (!context.IsReplaying) + { + // No wait to complete provided delegate. The current orchestrator need to be very thin and quick to run. + // This is queued as independently to avoid long running callbacks and safe guard the orchestrator for its frequency and failures occur in callbacks. + bool isQueued = ThreadPool.QueueUserWorkItem(async (_) => + { + await this.callBack(context.OrchestrationInstance, controlQueueHeartbeatTaskContextInput, controlQueueHeartbeatTaskContextInit, this.cancellationTokenSrc.Token); + }); + } + + // [Logs] Add log for a heartbeat message from current instance. + // Structured logging: ControlQueueHeartbeatTaskOrchestrator + // -> orchestrationInstance: context.OrchestrationInstance.ToString() + // -> initialControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + // -> inputControlQueueHeartbeatTaskContext: controlQueueHeartbeatTaskContextInit.ToString() + FileWriter.WriteLogControlQueueOrch($"ControlQueueHeartbeatTaskOrchestrator " + + $"OrchestrationInstance:{context.OrchestrationInstance} " + + $"controlQueueHeartbeatTaskContextInit:{controlQueueHeartbeatTaskContextInit}, " + + $"controlQueueHeartbeatTaskContextInput: {controlQueueHeartbeatTaskContextInput}" + + $"message: Sending signal for control-queue heartbeat."); + + context.ContinueAsNew(controlQueueHeartbeatTaskContextInput); + + return ControlQueueHeartbeatTaskResult.Succeeded; + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs new file mode 100644 index 000000000..828ec5910 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/ControlQueueHeartbeatTaskResult.cs @@ -0,0 +1,28 @@ +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + internal enum ControlQueueHeartbeatTaskResult + { + /// + /// Default value of ControlQueueHeartbeatTaskResult. + /// + Unknown, + + /// + /// Orchestratoin succeeded. + /// + Succeeded, + + /// + /// Invalid input. + /// Happens when input is either null or not of type . + /// + InvalidInput, + + /// + /// Mismatch in heartbeat orchestration context and input to orchestration instance. + /// This happens when the taskhubworker running with different partition count than that of orchestration instance queued with. + /// Usually it happens when partition-count is changed for a taskhub. + /// + InputContextMismatch + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/FileWriter.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/FileWriter.cs new file mode 100644 index 000000000..04f35cc78 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/FileWriter.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; + +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// + /// + /// + public class FileWriter + { + static string timeTicks = DateTime.UtcNow.Ticks.ToString(); + static string path = @$"C:\DTF\v2.16.1\Logs_{timeTicks}.txt"; + static string pathControlQueueMonitor = @$"C:\DTF\v2.16.1\Logs_ControlQueueMonitor_{timeTicks}.txt"; + static string pathControlQueueOrch = @$"C:\DTF\v2.16.1\Logs_ControlQueueOrch_{timeTicks}.txt"; + static string pathControlQueueProgram = @$"C:\DTF\v2.16.1\Logs_ControlQueueProgram_{timeTicks}.txt"; + + static object obj = new object(); + + private static void WriteLog(string path, string msg) + { + var modMsg = $"[{DateTime.UtcNow.ToLongTimeString()}] : {msg}" + Environment.NewLine; + Console.WriteLine(modMsg); + + lock (obj) + { + File.AppendAllText(path, modMsg); + } + } + + /// + /// + /// + /// + public static void WriteLogControlQueueMonitor(string msg) + { + WriteLog(pathControlQueueMonitor, msg); + } + + /// + /// + /// + /// + public static void WriteLogControlQueueOrch(string msg) + { + WriteLog(pathControlQueueOrch, msg); + } + + /// + /// + /// + /// + public static void WriteLogControlQueueProgram(string msg) + { + WriteLog(pathControlQueueProgram, msg); + } + } +} diff --git a/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs new file mode 100644 index 000000000..56c9ca651 --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHeartbeat/IControlQueueHelper.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using DurableTask.Core; + +namespace DurableTask.AzureStorage.ControlQueueHeartbeat +{ + /// Monitors control queue health for orchestrator's processing. Make sure to provide same setting for taskhubclient, taskhubworker and IControlQueueHealthMonitor. + public interface IControlQueueHelper + { +#nullable enable + /// + /// Sets up the TaskHub client and worker for control-queue heartbeat and detects if any of heartbeat orchestration running on each control-queue is not running. + /// + /// TaskHubClient object. + /// TaskHubWorker object. + /// Callback to run with each execution of the orchestrator of type . + /// Callback to run with each time the detects fault of type . + /// Cancellation token. + /// Task result. + Task StartControlQueueHeartbeatMonitorAsync( + TaskHubClient taskHubClient, + TaskHubWorker taskHubWorker, + Func callBackHeartOrchAsync, + Func callBackControlQueueValidation, + CancellationToken cancellationToken); +#nullable disable + + /// + /// Adds orchestrator instances of type for each control queue. + /// + /// TaskHubClient object. + /// CancellationToken. + /// If true, creates new instances of orchestrator, otherwise creates only if there is no running instance of orchestrator with same instance id.. + /// Task result. + Task ScheduleControlQueueHeartbeatOrchestrationsAsync(TaskHubClient taskHubClient, CancellationToken cancellationToken, bool force = false); + + /// + /// Adds orchestrator instance of type to TaskHubWorkerObject. + /// + /// TaskHubWorker object. + /// Callback to run with each execution of the orchestrator of type . + void RegisterControlQueueHeartbeatOrchestration( + TaskHubWorker taskHubWorker, + Func callBackHeartOrchAsync); + + /// + /// Gets instanceId which is targeted for mentioned control-queue names. + /// + /// Collection of controlQueueNumbers. + /// InstanceId prefix. + /// InstanceId for control-queue. + string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = ""); + } +}