Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServiceBusRetryPolicy generic overloads to avoid closure capturing #19522

Merged
merged 13 commits into from
Mar 22, 2021
Merged
168 changes: 92 additions & 76 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,18 @@ public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMess
TimeSpan? maxWaitTime,
CancellationToken cancellationToken)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(async (timeout) =>
{
messages = await ReceiveMessagesAsyncInternal(
maxMessages,
maxWaitTime,
timeout,
cancellationToken).ConfigureAwait(false);
},
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
return await _retryPolicy.RunOperation(static async (value, timeout, token) =>
{
var (receiver, maxMessages, maxWaitTime) = value;
return await receiver.ReceiveMessagesAsyncInternal(
maxMessages,
maxWaitTime,
timeout,
token).ConfigureAwait(false);
},
(this, maxMessages, maxWaitTime),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -350,10 +349,14 @@ public override async Task CompleteAsync(
string lockToken,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) =>
await CompleteInternalAsync(
lockToken,
timeout).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken) = value;
await receiver.CompleteInternalAsync(
lockToken,
timeout).ConfigureAwait(false);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -491,10 +494,15 @@ public override async Task DeferAsync(
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeferInternalAsync(
lockToken,
timeout,
propertiesToModify).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken, properties) = value;
await receiver.DeferInternalAsync(
lockToken,
timeout,
properties).ConfigureAwait(false);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -543,10 +551,15 @@ public override async Task AbandonAsync(
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await AbandonInternalAsync(
lockToken,
timeout,
propertiesToModify).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken, properties) = value;
await receiver.AbandonInternalAsync(
lockToken,
timeout,
properties).ConfigureAwait(false);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -602,12 +615,17 @@ public override async Task DeadLetterAsync(
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeadLetterInternalAsync(
lockToken,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription).ConfigureAwait(false),
static async (value, timeout, _) =>
{
var (receiver, lockToken, propertiesToModify, deadLetterReason, deadLetterErrorDescription) = value;
await receiver.DeadLetterInternalAsync(
lockToken,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription).ConfigureAwait(false);
},
(this, lockToken, propertiesToModify, deadLetterReason, deadLetterErrorDescription),
_connectionScope,
cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -828,20 +846,20 @@ public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessage
CancellationToken cancellationToken = default)
{
long seqNumber = sequenceNumber ?? LastPeekedSequenceNumber + 1;
IReadOnlyList<ServiceBusReceivedMessage> messages = null;

await _retryPolicy.RunOperation(
async (timeout) =>
messages = await PeekMessagesInternalAsync(
seqNumber,
messageCount,
timeout,
cancellationToken)
.ConfigureAwait(false),
return await _retryPolicy.RunOperation(
static async (value, timeout, token) =>
{
var (receiver, seqNumber, messageCount) = value;
return await receiver.PeekMessagesInternalAsync(
seqNumber,
messageCount,
timeout,
token)
.ConfigureAwait(false);
},
(this, seqNumber, messageCount),
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
}

/// <summary>
Expand Down Expand Up @@ -933,17 +951,17 @@ public override async Task<DateTimeOffset> RenewMessageLockAsync(
string lockToken,
CancellationToken cancellationToken)
{
DateTimeOffset lockedUntil = DateTimeOffset.MinValue;
await _retryPolicy.RunOperation(
async (timeout) =>
return await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
{
lockedUntil = await RenewMessageLockInternalAsync(
var (receiver, lockToken) = value;
return await receiver.RenewMessageLockInternalAsync(
lockToken,
timeout).ConfigureAwait(false);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);
return lockedUntil;
}

/// <summary>
Expand Down Expand Up @@ -1005,13 +1023,10 @@ private async Task<AmqpResponseMessage> ExecuteRequest(TimeSpan timeout, AmqpReq
/// </summary>
public override async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
{
DateTimeOffset lockedUntil;
await _retryPolicy.RunOperation(
async (timeout) =>
{
lockedUntil = await RenewSessionLockInternal(
timeout).ConfigureAwait(false);
},
var lockedUntil = await _retryPolicy.RunOperation(
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
static async (receiver, timeout, _) => await receiver.RenewSessionLockInternal(
timeout).ConfigureAwait(false),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
SessionLockedUntil = lockedUntil;
Expand Down Expand Up @@ -1055,15 +1070,11 @@ internal async Task<DateTimeOffset> RenewSessionLockInternal(TimeSpan timeout)
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public override async Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
{
BinaryData sessionState = default;
await _retryPolicy.RunOperation(
async (timeout) =>
{
sessionState = await GetStateInternal(timeout).ConfigureAwait(false);
},
return await _retryPolicy.RunOperation(
static async (receiver, timeout, _) => await receiver.GetStateInternal(timeout).ConfigureAwait(false),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
return sessionState;
}

internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
Expand Down Expand Up @@ -1110,12 +1121,14 @@ public override async Task SetStateAsync(
CancellationToken cancellationToken)
{
await _retryPolicy.RunOperation(
async (timeout) =>
static async (value, timeout, _) =>
{
await SetStateInternal(
var (receiver, sessionState) = value;
await receiver.SetStateInternal(
sessionState,
timeout).ConfigureAwait(false);
},
(this, sessionState),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -1162,14 +1175,17 @@ public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDefe
long[] sequenceNumbers,
CancellationToken cancellationToken = default)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(
async (timeout) => messages = await ReceiveDeferredMessagesAsyncInternal(
sequenceNumbers,
timeout).ConfigureAwait(false),
return await _retryPolicy.RunOperation(
static async (value, timeout, _) =>
{
var (receiver, sequenceNumbers) = value;
return await receiver.ReceiveDeferredMessagesAsyncInternal(
sequenceNumbers,
timeout).ConfigureAwait(false);
},
(this, sequenceNumbers),
_connectionScope,
cancellationToken).ConfigureAwait(false);
return messages;
}

internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsyncInternal(
Expand Down Expand Up @@ -1308,12 +1324,12 @@ private static TimeSpan UseMinimum(
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task OpenLinkAsync(CancellationToken cancellationToken)
{
ReceivingAmqpLink link = null;
await _retryPolicy.RunOperation(
async (timeout) =>
link = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
_connectionScope,
cancellationToken).ConfigureAwait(false);
_ = await _retryPolicy.RunOperation(
static async (receiveLink, timeout, _) =>
await receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
_receiveLink,
_connectionScope,
cancellationToken).ConfigureAwait(false);
}

private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>
Expand Down
Loading