Skip to content

Commit

Permalink
[Storage][Function] Fix visibility timeout extension in queues extens…
Browse files Browse the repository at this point in the history
…ion. (#22569)

* Fix visibility timeout extension.

* wip
  • Loading branch information
kasobol-msft committed Jul 9, 2021
1 parent 89a228c commit 0c94883
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners
internal class UpdateQueueMessageVisibilityCommand : ITaskSeriesCommand
{
private readonly QueueClient _queue;
private readonly QueueMessage _message;
private volatile QueueMessage _message;
private readonly TimeSpan _visibilityTimeout;
private readonly IDelayStrategy _speedupStrategy;
private readonly Action<UpdateReceipt> _onUpdateReceipt;
Expand Down Expand Up @@ -51,6 +51,7 @@ public async Task<TaskSeriesCommandResult> ExecuteAsync(CancellationToken cancel
try
{
UpdateReceipt updateReceipt = await _queue.UpdateMessageAsync(_message.MessageId, _message.PopReceipt, visibilityTimeout: _visibilityTimeout, cancellationToken: cancellationToken).ConfigureAwait(false);
_message = _message.Update(updateReceipt);
_onUpdateReceipt?.Invoke(updateReceipt);
// The next execution should occur after a normal delay.
delay = _speedupStrategy.GetNextDelay(executionSucceeded: true);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release History

## 5.0.0-beta.5 (Unreleased)
- Fixed bug where QueueTrigger would fail to renew ownership of message if function runs for long period of time (i.e. 15 minutes and longer).


## 5.0.0-beta.4 (2021-05-18)
- Fixed bug where custom implementations of `IQueueProcessorFactory` could overwrite each other settings.
- Added new configuration formats so extensions that need multiple storage services can specify them in one connection configuration.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Tests
{
using global::Azure.Storage.Queues;
using global::Azure.Storage.Queues.Models;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Timers;
using Moq;
using NUnit.Framework;
using System;
using System.Threading;
using System.Threading.Tasks;

public class UpdateQueueMessageVisibilityCommandTests
{
private QueueServiceClient _queueServiceClient;

[SetUp]
public void SetUp()
{
_queueServiceClient = AzuriteNUnitFixture.Instance.GetQueueServiceClient();
}

[Test]
public async Task CanExtendVisibilityTimeoutMultipleTimes()
{
// Arange
string queueName = Guid.NewGuid().ToString();
var queueClient = _queueServiceClient.GetQueueClient(queueName);
await queueClient.CreateIfNotExistsAsync();
await queueClient.SendMessageAsync("foo");
QueueMessage message = await queueClient.ReceiveMessageAsync(visibilityTimeout: TimeSpan.FromSeconds(60));
Mock<IDelayStrategy> delayStrategyMock = new Mock<IDelayStrategy>();
delayStrategyMock.Setup(x => x.GetNextDelay(It.IsAny<bool>())).Returns(TimeSpan.FromMilliseconds(1));

int counter = 0;
Action<UpdateReceipt> onUpdateReceipt = updateReceipt => { counter++; };

var command = new UpdateQueueMessageVisibilityCommand(queueClient, message, TimeSpan.FromSeconds(60), delayStrategyMock.Object, onUpdateReceipt);

// Act
var commandResult1 = await command.ExecuteAsync(CancellationToken.None);
var commandResult2 = await command.ExecuteAsync(CancellationToken.None);

// Assert
// If renewal was successful then result should wait as much as IDelayStrategy decided (see mock above).
// Otherwise if it failed due to non-transient reason (i.e. pop recepit got lost) it would delay infinitely leading to assertion error below.
Assert.IsTrue(commandResult1.Wait.Wait(TimeSpan.FromSeconds(1)));
Assert.IsTrue(commandResult2.Wait.Wait(TimeSpan.FromSeconds(1)));
// Check the counter too
Assert.AreEqual(2, counter);
}
}
}

0 comments on commit 0c94883

Please sign in to comment.