Skip to content

Commit

Permalink
Fall back to management link when settling non-session message (#37704)
Browse files Browse the repository at this point in the history
* Fall back to management link when settling non-session message

* fix typo

* fix test
  • Loading branch information
JoshLove-msft authored Jul 19, 2023
1 parent 3371dea commit cbc9eac
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 20 deletions.
60 changes: 48 additions & 12 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ await receiver.CompleteInternalAsync(
/// </summary>
///
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
private async Task CompleteInternalAsync(
Guid lockToken,
TimeSpan timeout)
Expand All @@ -460,20 +460,28 @@ await DisposeMessageRequestResponseAsync(
SessionId).ConfigureAwait(false);
return;
}
await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, DispositionStatus.Completed, timeout).ConfigureAwait(false);
}

/// <summary>
/// Settles a <see cref="ServiceBusReceivedMessage"/> using a lock token.
/// </summary>
///
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
/// <param name="outcome"></param>
/// <param name="timeout"></param>
/// <param name="outcome">The outcome of the message - used when disposing over receive link.</param>
/// <param name="disposition">The disposition of the message - used when disposing over the management link.</param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">Properties to modify when deadlettering, deferring, or abandoning.</param>
/// <param name="deadLetterReason">Dead letter reason. Only valid when deadlettering.</param>
/// <param name="deadLetterDescription">Dead letter description. Only valid when deadlettering.</param>
private async Task DisposeMessageAsync(
Guid lockToken,
Outcome outcome,
TimeSpan timeout)
DispositionStatus disposition,
TimeSpan timeout,
IDictionary<string, object> propertiesToModify = null,
string deadLetterReason = null,
string deadLetterDescription = null)
{
byte[] bufferForLockToken = ArrayPool<byte>.Shared.Rent(SizeOfGuidInBytes);
GuidUtilities.WriteGuidToBuffer(lockToken, bufferForLockToken.AsSpan(0, SizeOfGuidInBytes));
Expand Down Expand Up @@ -508,7 +516,21 @@ private async Task DisposeMessageAsync(
{
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
ThrowLockLostException();
if (_isSessionReceiver)
{
ThrowLockLostException();
}

// The message was not found on the link which can occur as a result of a reconnect.
// Attempt to settle the message over the management link.
await DisposeMessageRequestResponseAsync(
lockToken,
timeout,
disposition,
propertiesToModify: propertiesToModify,
deadLetterReason: deadLetterReason,
deadLetterDescription: deadLetterDescription).ConfigureAwait(false);
return;
}

throw error.ToMessagingContractException();
Expand Down Expand Up @@ -587,7 +609,7 @@ await receiver.DeferInternalAsync(
/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
///
/// <param name="lockToken">The lock token of the <see cref="ServiceBusMessage" />.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param>
///
private Task DeferInternalAsync(
Expand All @@ -605,7 +627,12 @@ private Task DeferInternalAsync(
SessionId,
propertiesToModify);
}
return DisposeMessageAsync(lockToken, GetDeferOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(
lockToken,
GetDeferOutcome(propertiesToModify),
DispositionStatus.Defered,
timeout,
propertiesToModify: propertiesToModify);
}

/// <summary>
Expand Down Expand Up @@ -645,7 +672,7 @@ await receiver.AbandonInternalAsync(
/// </summary>
///
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</param>
private Task AbandonInternalAsync(
Guid lockToken,
Expand All @@ -662,7 +689,12 @@ private Task AbandonInternalAsync(
SessionId,
propertiesToModify);
}
return DisposeMessageAsync(lockToken, GetAbandonOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(
lockToken,
GetAbandonOutcome(propertiesToModify),
DispositionStatus.Abandoned,
timeout,
propertiesToModify: propertiesToModify);
}

/// <summary>
Expand Down Expand Up @@ -710,7 +742,7 @@ await receiver.DeadLetterInternalAsync(
/// </summary>
///
/// <param name="lockToken">The lock token of the corresponding message to dead-letter.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">The properties of the message to modify while moving to subqueue.</param>
/// <param name="deadLetterReason">The reason for dead-lettering the message.</param>
/// <param name="deadLetterErrorDescription">The error description for dead-lettering the message.</param>
Expand Down Expand Up @@ -740,7 +772,11 @@ internal virtual Task DeadLetterInternalAsync(
return DisposeMessageAsync(
lockToken,
GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
timeout);
DispositionStatus.Suspended,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription);
}

private static Rejected GetRejectedOutcome(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,106 @@ public async Task PeekSingleMessage()
}
}

[Test]
public async Task CanRenewWithSeparateReceiver()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
ServiceBusSender sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
var receiver1 = client.CreateReceiver(scope.QueueName);
var message1 = await receiver1.ReceiveMessageAsync();
await receiver1.RenewMessageLockAsync(message1);

var receiver2 = client.CreateReceiver(scope.QueueName);
await receiver2.RenewMessageLockAsync(message1);
await receiver2.CompleteMessageAsync(message1);
}
}

[Test]
public async Task CanCompleteAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.CompleteMessageAsync(message);
}
}

[Test]
public async Task CanAbandonAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.AbandonMessageAsync(message, new Dictionary<string, object>{{ "test key", "test value" }});
message = await receiver.ReceiveMessageAsync();
Assert.AreEqual("test value", message.ApplicationProperties["test key"]);
}
}

[Test]
public async Task CanDeferAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.DeferMessageAsync(message, new Dictionary<string, object>{{ "test key", "test value" }});
message = await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber);
Assert.AreEqual("test value", message.ApplicationProperties["test key"]);
}
}

[Test]
public async Task CanDeadLetterAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.DeadLetterMessageAsync(message, new Dictionary<string, object>{{ "test key", "test value" }}, "test reason", "test description");

var dlqReceiver = client.CreateReceiver(scope.QueueName, new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
var dlqMessage = await dlqReceiver.ReceiveMessageAsync();
Assert.AreEqual("test reason", dlqMessage.DeadLetterReason);
Assert.AreEqual("test description", dlqMessage.DeadLetterErrorDescription);
Assert.AreEqual("test value", dlqMessage.ApplicationProperties["test key"]);
}
}

[Test]
public async Task PeekMessagesWithACustomIdentifier()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,5 +1064,85 @@ public async Task OpenSessionIsNotClosedWhenAcceptNextSessionTimesOut(bool enabl
Assert.IsNotNull(message);
}
}

[Test]
public async Task CannotCompleteAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.CompleteMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task CanAbandonAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.AbandonMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task CannotDeferAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.DeferMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task CannotDeadLetterAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.DeadLetterMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,7 @@ public async Task TransactionThrowsWhenOperationsOfDifferentPartitionsAreInSameT

await receiver.CompleteMessageAsync(receivedMessage1);

// the service seems to abandon the message that
// triggered the InvalidOperationException
// in the transaction
Assert.That(
async () =>
await receiver.CompleteMessageAsync(receivedMessage2), Throws.InstanceOf<ServiceBusException>()
.And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.MessageLockLost));
await receiver.CompleteMessageAsync(receivedMessage2);
}
}

Expand Down

0 comments on commit cbc9eac

Please sign in to comment.