From 15b5e12249442ebf6f583177e646204c88bdd1df Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 18 Apr 2024 16:34:13 -0700 Subject: [PATCH] add mitigation for misrouted messages --- ...zureStorageOrchestrationServiceSettings.cs | 7 ++++ .../Messaging/ControlQueue.cs | 32 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 609a8b35d..abeecb06e 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -318,5 +318,12 @@ internal LogHelper Logger /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Gets or sets whether or not misrouted queue message should be corrected, according to the instanceID hashing function. + /// This defaults to false, to avoid possible "infinite re-routing" of messages. Users may set this to true to try to recover + /// from accidentally changing the partitionCount of an existing TaskHub, which can lead to misrouted messages. + /// + public bool CorrectMisourtedMessages { get; set; } = false; } } diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 4484c3ee9..43ac457ba 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -46,6 +46,32 @@ public ControlQueue( protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout; + private async Task correctMessagesIfMisourted(MessageData messageData) + { + // validate that the message came from the expected queue + string instanceId = messageData.TaskMessage.OrchestrationInstance.InstanceId; + uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; + string expectedQueueOfOrigin = AzureStorageOrchestrationService.GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex); + + // route to the right queue if the user opted in to the mitigation. + // This assumes that all workers already have correct partitionCount configuration + // RISK - if different workers have different partitionCount values, + // this could lead to infinite re-routing of orchestrator messages. + if (expectedQueueOfOrigin != this.Name) + { + // obtain reference to expected queue (should have been created already) + var expectedQueue = this.azureStorageClient.GetQueueReference(expectedQueueOfOrigin); + await expectedQueue.CreateIfNotExistsAsync(); + + // place on correct queue + var originalMessage = messageData.OriginalQueueMessage; + await expectedQueue.AddMessageAsync(originalMessage, TimeSpan.FromMinutes(1)); + + // delete message from current queue + await this.storageQueue.DeleteMessageAsync(originalMessage); + } + } + public async Task> GetMessagesAsync(CancellationToken cancellationToken) { using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken)) @@ -108,6 +134,12 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) messageData = await this.messageManager.DeserializeQueueMessageAsync( queueMessage, this.storageQueue.Name); + + if (this.settings.CorrectMisourtedMessages) + { + await correctMessagesIfMisourted(messageData); + return; // skip further processing of this message + } } catch (Exception e) {