From 04200179162d5b4c385b6fd6af71a83a88699ece Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 19 Mar 2021 14:33:35 +0100 Subject: [PATCH] Remove async/await where not needed --- .../src/Amqp/AmqpReceiver.cs | 148 ++++++++---------- .../src/Amqp/AmqpSender.cs | 68 +++----- 2 files changed, 85 insertions(+), 131 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 9904af2eb7669..7e380668b5943 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -237,24 +237,22 @@ private static void CloseLink(RequestResponseAmqpLink link) /// An optional instance to signal the request to cancel the operation. /// /// List of messages received. Returns an empty list if no message is found. - public override async Task> ReceiveMessagesAsync( + public override Task> ReceiveMessagesAsync( int maxMessages, TimeSpan? maxWaitTime, - CancellationToken cancellationToken) - { - return await _retryPolicy.RunOperation(static async (value, timeout, token) => + CancellationToken cancellationToken) => + _retryPolicy.RunOperation(static (value, timeout, token) => { var (receiver, maxmsgs, maxwait) = value; - return await receiver.ReceiveMessagesAsyncInternal( + return receiver.ReceiveMessagesAsyncInternal( maxmsgs, maxwait, timeout, - token).ConfigureAwait(false); + token); }, (this, maxMessages, maxWaitTime), _connectionScope, - cancellationToken).ConfigureAwait(false); - } + cancellationToken); /// /// Receives a list of from the Service Bus entity. @@ -345,20 +343,18 @@ private async Task> ReceiveMessagesAsyn /// /// /// A task to be resolved on when the operation has completed. - public override async Task CompleteAsync( + public override Task CompleteAsync( string lockToken, CancellationToken cancellationToken = default) => - await _retryPolicy.RunOperation( - static async (value, timeout, _) => + _retryPolicy.RunOperation( + static (value, timeout, _) => { var (receiver, lckToken) = value; - await receiver.CompleteInternalAsync( - lckToken, - timeout).ConfigureAwait(false); + return receiver.CompleteInternalAsync(lckToken, timeout); }, (this, lockToken), _connectionScope, - cancellationToken).ConfigureAwait(false); + cancellationToken); /// /// Completes a series of using a list of lock tokens. This will delete the message from the service. @@ -489,22 +485,19 @@ private void ThrowLockLostException() /// /// /// A task to be resolved on when the operation has completed. - public override async Task DeferAsync( + public override Task DeferAsync( string lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) => - await _retryPolicy.RunOperation( - static async (value, timeout, _) => + _retryPolicy.RunOperation( + static (value, timeout, _) => { var (receiver, lckToken, properties) = value; - await receiver.DeferInternalAsync( - lckToken, - timeout, - properties).ConfigureAwait(false); + return receiver.DeferInternalAsync(lckToken, timeout, properties); }, (this, lockToken, propertiesToModify), _connectionScope, - cancellationToken).ConfigureAwait(false); + cancellationToken); /// Indicates that the receiver wants to defer the processing for the message. /// @@ -546,22 +539,19 @@ private Task DeferInternalAsync( /// /// /// A task to be resolved on when the operation has completed. - public override async Task AbandonAsync( + public override Task AbandonAsync( string lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) => - await _retryPolicy.RunOperation( - static async (value, timeout, _) => + _retryPolicy.RunOperation( + static (value, timeout, _) => { var (receiver, lckToken, properties) = value; - await receiver.AbandonInternalAsync( - lckToken, - timeout, - properties).ConfigureAwait(false); + return receiver.AbandonInternalAsync(lckToken, timeout, properties); }, (this, lockToken, propertiesToModify), _connectionScope, - cancellationToken).ConfigureAwait(false); + cancellationToken); /// /// Abandons a using a lock token. This will make the message available again for processing. @@ -608,26 +598,21 @@ private Task AbandonInternalAsync( /// /// /// A task to be resolved on when the operation has completed. - public override async Task DeadLetterAsync( + public override Task DeadLetterAsync( string lockToken, string deadLetterReason, string deadLetterErrorDescription = default, IDictionary propertiesToModify = default, CancellationToken cancellationToken = default) => - await _retryPolicy.RunOperation( - static async (value, timeout, _) => + _retryPolicy.RunOperation( + static (value, timeout, _) => { var (receiver, lckToken, properties, reason, description) = value; - await receiver.DeadLetterInternalAsync( - lckToken, - timeout, - properties, - reason, - description).ConfigureAwait(false); + return receiver.DeadLetterInternalAsync(lckToken, timeout, properties, reason, description); }, (this, lockToken, propertiesToModify, deadLetterReason, deadLetterErrorDescription), _connectionScope, - cancellationToken).ConfigureAwait(false); + cancellationToken); /// /// Moves a message to the dead-letter subqueue. @@ -840,26 +825,25 @@ private static Outcome GetModifiedOutcome( /// Also, unlike , this method will fetch even Deferred messages (but not Deadlettered message) /// /// - public override async Task> PeekMessagesAsync( + public override Task> PeekMessagesAsync( long? sequenceNumber, int messageCount = 1, CancellationToken cancellationToken = default) { long seqNumber = sequenceNumber ?? LastPeekedSequenceNumber + 1; - return await _retryPolicy.RunOperation( - static async (value, timeout, token) => + return _retryPolicy.RunOperation( + static (value, timeout, token) => { var (receiver, number, count) = value; - return await receiver.PeekMessagesInternalAsync( + return receiver.PeekMessagesInternalAsync( number, count, timeout, - token) - .ConfigureAwait(false); + token); }, (this, seqNumber, messageCount), _connectionScope, - cancellationToken).ConfigureAwait(false); + cancellationToken); } /// @@ -947,22 +931,20 @@ private async Task> PeekMessagesInterna /// /// Lock token associated with the message. /// An optional instance to signal the request to cancel the operation. - public override async Task RenewMessageLockAsync( + public override Task RenewMessageLockAsync( string lockToken, - CancellationToken cancellationToken) - { - return await _retryPolicy.RunOperation( - static async (value, timeout, _) => + CancellationToken cancellationToken) => + _retryPolicy.RunOperation( + static (value, timeout, _) => { var (receiver, lckToken) = value; - return await receiver.RenewMessageLockInternalAsync( + return receiver.RenewMessageLockInternalAsync( lckToken, - timeout).ConfigureAwait(false); + timeout); }, (this, lockToken), _connectionScope, - cancellationToken).ConfigureAwait(false); - } + cancellationToken); /// /// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue. @@ -1024,8 +1006,7 @@ private async Task ExecuteRequest(TimeSpan timeout, AmqpReq public override async Task RenewSessionLockAsync(CancellationToken cancellationToken = default) { var lockedUntil = await _retryPolicy.RunOperation( - static async (receiver, timeout, _) => await receiver.RenewSessionLockInternal( - timeout).ConfigureAwait(false), + static (receiver, timeout, _) => receiver.RenewSessionLockInternal(timeout), this, _connectionScope, cancellationToken).ConfigureAwait(false); @@ -1068,13 +1049,13 @@ internal async Task RenewSessionLockInternal(TimeSpan timeout) /// An optional instance to signal the request to cancel the operation. /// /// The session state as . - public override async Task GetStateAsync(CancellationToken cancellationToken = default) + public override Task GetStateAsync(CancellationToken cancellationToken = default) { - return await _retryPolicy.RunOperation( - static async (receiver, timeout, _) => await receiver.GetStateInternal(timeout).ConfigureAwait(false), + return _retryPolicy.RunOperation( + static (receiver, timeout, _) => receiver.GetStateInternal(timeout), this, _connectionScope, - cancellationToken).ConfigureAwait(false); + cancellationToken); } internal async Task GetStateInternal(TimeSpan timeout) @@ -1116,22 +1097,20 @@ internal async Task GetStateInternal(TimeSpan timeout) /// This state is stored on Service Bus forever unless you set an empty state on it. /// /// A task to be resolved on when the operation has completed. - public override async Task SetStateAsync( + public override Task SetStateAsync( BinaryData sessionState, - CancellationToken cancellationToken) - { - await _retryPolicy.RunOperation( - static async (value, timeout, _) => + CancellationToken cancellationToken) => + _retryPolicy.RunOperation( + static (value, timeout, _) => { var (receiver, state) = value; - await receiver.SetStateInternal( + return receiver.SetStateInternal( state, - timeout).ConfigureAwait(false); + timeout); }, (this, sessionState), _connectionScope, - cancellationToken).ConfigureAwait(false); - } + cancellationToken); internal async Task SetStateInternal( BinaryData sessionState, @@ -1171,22 +1150,20 @@ internal async Task SetStateInternal( /// Messages identified by sequence number are returned. Returns null if no messages are found. /// Throws if the messages have not been deferred. /// - public override async Task> ReceiveDeferredMessagesAsync( + public override Task> ReceiveDeferredMessagesAsync( long[] sequenceNumbers, - CancellationToken cancellationToken = default) - { - return await _retryPolicy.RunOperation( - static async (value, timeout, _) => + CancellationToken cancellationToken = default) => + _retryPolicy.RunOperation( + static (value, timeout, _) => { var (receiver, sqn) = value; - return await receiver.ReceiveDeferredMessagesAsyncInternal( + return receiver.ReceiveDeferredMessagesAsyncInternal( sqn, - timeout).ConfigureAwait(false); + timeout); }, (this, sequenceNumbers), _connectionScope, - cancellationToken).ConfigureAwait(false); - } + cancellationToken); internal virtual async Task> ReceiveDeferredMessagesAsyncInternal( long[] sequenceNumbers, @@ -1322,14 +1299,13 @@ private static TimeSpan UseMinimum( /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. - public override async Task OpenLinkAsync(CancellationToken cancellationToken) + public override Task OpenLinkAsync(CancellationToken cancellationToken) { - _ = await _retryPolicy.RunOperation( - static async (link, timeout, _) => - await link.GetOrCreateAsync(timeout).ConfigureAwait(false), + return _retryPolicy.RunOperation( + static (link, timeout, _) => link.GetOrCreateAsync(timeout), _receiveLink, _connectionScope, - cancellationToken).ConfigureAwait(false); + cancellationToken); } private bool HasLinkCommunicationError(ReceivingAmqpLink link) => diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs index f1868ebcae231..d3090cc496daf 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs @@ -161,7 +161,7 @@ public override async ValueTask CreateMessageBatchAsync( CreateMessageBatchOptions options, CancellationToken cancellationToken) { - Task createBatchTask = _retryPolicy.RunOperation(static async (value, timeout, _) => + return await _retryPolicy.RunOperation(static async (value, timeout, _) => { var (sender, ops) = value; return await sender.CreateMessageBatchInternalAsync( @@ -170,8 +170,7 @@ public override async ValueTask CreateMessageBatchAsync( }, (this, options), _connectionScope, - cancellationToken); - return await createBatchTask.ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); } internal async ValueTask CreateMessageBatchInternalAsync( @@ -206,22 +205,17 @@ internal async ValueTask CreateMessageBatchInternalAsync( /// /// A task to be resolved on when the operation has completed. /// - public override async Task SendBatchAsync( + public override Task SendBatchAsync( ServiceBusMessageBatch messageBatch, - CancellationToken cancellationToken) - { - await _retryPolicy.RunOperation(static async (value, timeout, token) => + CancellationToken cancellationToken) => + _retryPolicy.RunOperation(static (value, timeout, token) => { var (sender, msgs) = value; - await sender.SendBatchInternalAsync( - msgs, - timeout, - token).ConfigureAwait(false); + return sender.SendBatchInternalAsync(msgs, timeout, token); }, (this, messageBatch.AsEnumerable()), - _connectionScope, - cancellationToken).ConfigureAwait(false); - } + _connectionScope, + cancellationToken); /// /// Sends a set of messages to the associated Queue/Topic using a batched approach. @@ -304,22 +298,20 @@ internal virtual async Task SendBatchInternalAsync( /// /// The list of messages to send. /// An optional instance to signal the request to cancel the operation. - public override async Task SendAsync( + public override Task SendAsync( IReadOnlyList messages, - CancellationToken cancellationToken) - { - await _retryPolicy.RunOperation(static async (value, timeout, token) => + CancellationToken cancellationToken) => + _retryPolicy.RunOperation(static (value, timeout, token) => { var (sender, msgs) = value; - await sender.SendBatchInternalAsync( + return sender.SendBatchInternalAsync( msgs, timeout, - token).ConfigureAwait(false); + token); }, (this, messages), - _connectionScope, - cancellationToken).ConfigureAwait(false); - } + _connectionScope, + cancellationToken); /// /// Closes the connection to the transport sender instance. @@ -382,28 +374,17 @@ public override async Task> ScheduleMessagesAsync( IReadOnlyList messages, CancellationToken cancellationToken = default) { - return await _retryPolicy.RunOperation(static async (value, timeout, token) => + return await _retryPolicy.RunOperation(static (value, timeout, token) => { var (sender, msgs) = value; - return await sender - .ScheduleMessageInternalAsync( - msgs, - timeout, - token).ConfigureAwait(false); + return sender.ScheduleMessageInternalAsync(msgs, timeout, token); }, (this, messages), _connectionScope, cancellationToken).ConfigureAwait(false) ?? Array.Empty(); } - /// - /// - /// - /// - /// - /// - /// - internal async Task ScheduleMessageInternalAsync( + internal async Task> ScheduleMessageInternalAsync( IReadOnlyList messages, TimeSpan timeout, CancellationToken cancellationToken = default) @@ -494,23 +475,20 @@ internal async Task ScheduleMessageInternalAsync( /// /// /// - public override async Task CancelScheduledMessagesAsync( + public override Task CancelScheduledMessagesAsync( long[] sequenceNumbers, - CancellationToken cancellationToken = default) - { - Task cancelMessageTask = _retryPolicy.RunOperation(static async (value, timeout, token) => + CancellationToken cancellationToken = default) => + _retryPolicy.RunOperation(static (value, timeout, token) => { var (sender, sqn) = value; - await sender.CancelScheduledMessageInternalAsync( + return sender.CancelScheduledMessageInternalAsync( sqn, timeout, - token).ConfigureAwait(false); + token); }, (this, sequenceNumbers), _connectionScope, cancellationToken); - await cancelMessageTask.ConfigureAwait(false); - } /// ///