From 549f4617960949f2a3b2fd242380231ba04c67a7 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 3 May 2023 14:59:19 -0700 Subject: [PATCH] Add QueueDeclareAsync to IChannel and a test for it. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add QueuDeclareAsync. Not working yet. First stab at implementing QueueDeclareAsync Finish implementing QueueDeclareAsync API Update Replace RPC lock with SemaphoreSlim that is also used for full-async methods. Add test of concurrent queue declarations. Update projects/RabbitMQ.Client/client/impl/ChannelBase.cs Co-authored-by: Stefán Jökull Sigurðarson Update projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs Co-authored-by: Stefán Jökull Sigurðarson PR review suggestions --- .../RabbitMQ.Client/client/api/IChannel.cs | 15 ++ .../RabbitMQ.Client/client/framing/Channel.cs | 16 +-- .../client/impl/AutorecoveringChannel.cs | 8 ++ .../client/impl/ChannelBase.cs | 128 +++++++++++++++--- .../client/impl/RpcContinuationQueue.cs | 19 +++ .../impl/SimpleBlockingRpcContinuation.cs | 33 +++-- .../Unit/APIApproval.Approve.verified.txt | 1 + projects/Unit/TestQueueDeclare.cs | 50 ++++++- 8 files changed, 226 insertions(+), 44 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 144e2c4dcc..3f7f4693e2 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -188,6 +188,7 @@ string BasicConsume( void BasicNack(ulong deliveryTag, bool multiple, bool requeue); #nullable enable + /// /// Publishes a message. /// @@ -198,6 +199,7 @@ string BasicConsume( /// void BasicPublish(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + /// /// Publishes a message. /// @@ -208,6 +210,7 @@ void BasicPublish(string exchange, string routingKey, in TPropertie /// void BasicPublish(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + /// /// Asynchronously publishes a message. /// @@ -218,6 +221,7 @@ void BasicPublish(CachedString exchange, CachedString routingKey, i /// ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + /// /// Asynchronously publishes a message. /// @@ -228,6 +232,7 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, in /// ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + #nullable disable /// @@ -361,6 +366,16 @@ ValueTask BasicPublishAsync(CachedString exchange, CachedString rou /// Optional; additional queue arguments, e.g. "x-queue-type" QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments); + /// + /// Asynchronously declares a queue. See the Queues guide to learn more. + /// + /// The name of the queue. Pass an empty string to make the server generate a name. + /// Should this queue will survive a broker restart? + /// Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes. + /// Should this queue be auto-deleted when its last consumer (if any) unsubscribes? + /// Optional; additional queue arguments, e.g. "x-queue-type" + ValueTask QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments); + /// /// Declares a queue. See the Queues guide to learn more. /// diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 2243ddcc5d..a59cceb1e3 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -197,15 +197,12 @@ public override void _Private_QueueBind(string queue, string exchange, string ro public override void _Private_QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, IDictionary arguments) { + /* + * Note: + * Even though nowait is a parameter, ChannelSend must be used + */ var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments); - if (nowait) - { - ChannelSend(method); - } - else - { - ChannelSend(method); - } + ChannelSend(method); } public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait) @@ -382,8 +379,7 @@ protected override bool DispatchAsynchronous(in IncomingCommand cmd) } case ProtocolCommandId.QueueDeclareOk: { - HandleQueueDeclareOk(in cmd); - return true; + return HandleQueueDeclareOk(in cmd); } default: return false; } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index e8a7fd9c26..ed3a60efc9 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -430,6 +430,14 @@ public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool _connection.RecordQueue(new RecordedQueue(queue, queue.Length == 0, durable, exclusive, autoDelete, arguments)); } + public async ValueTask QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + { + ThrowIfDisposed(); + QueueDeclareOk result = await _innerChannel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments); + _connection.RecordQueue(new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments)); + return result; + } + public QueueDeclareOk QueueDeclarePassive(string queue) => InnerChannel.QueueDeclarePassive(queue); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 2c9fcfac7b..717aff3bfb 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -42,7 +42,6 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; -using RabbitMQ.Util; namespace RabbitMQ.Client.Impl { @@ -57,7 +56,6 @@ internal abstract class ChannelBase : IChannel, IRecoverable private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); - private readonly object _rpcLock = new object(); private readonly object _confirmLock = new object(); private readonly LinkedList _pendingDeliveryTags = new LinkedList(); @@ -328,7 +326,8 @@ private void HandleCommand(in IncomingCommand cmd) { if (!DispatchAsynchronous(in cmd)) // Was asynchronous. Already processed. No need to process further. { - _continuationQueue.Next().HandleCommand(in cmd); + IRpcContinuation c = _continuationQueue.Next(); + c.HandleCommand(in cmd); } } @@ -337,12 +336,17 @@ protected void ChannelRpc(in TMethod method, ProtocolCommandId returnCo { var k = new SimpleBlockingRpcContinuation(); IncomingCommand reply; - lock (_rpcLock) + _rpcSemaphore.Wait(); + try { Enqueue(k); Session.Transmit(in method); k.GetReply(ContinuationTimeout, out reply); } + finally + { + _rpcSemaphore.Release(); + } reply.ReturnMethodBuffer(); @@ -358,12 +362,17 @@ protected TReturn ChannelRpc(in TMethod method, ProtocolComman var k = new SimpleBlockingRpcContinuation(); IncomingCommand reply; - lock (_rpcLock) + _rpcSemaphore.Wait(); + try { Enqueue(k); Session.Transmit(in method); k.GetReply(ContinuationTimeout, out reply); } + finally + { + _rpcSemaphore.Release(); + } if (reply.CommandId != returnCommandId) { @@ -783,13 +792,21 @@ protected void HandleConnectionUnblocked() Session.Connection.HandleConnectionUnblocked(); } - protected void HandleQueueDeclareOk(in IncomingCommand cmd) + protected bool HandleQueueDeclareOk(in IncomingCommand cmd) { - var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span); - cmd.ReturnMethodBuffer(); - var k = (QueueDeclareRpcContinuation)_continuationQueue.Next(); - k.m_result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); - k.HandleCommand(IncomingCommand.Empty); // release the continuation. + if (_continuationQueue.TryPeek(out var k)) + { + _continuationQueue.Next(); + var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span); + cmd.ReturnMethodBuffer(); + k.m_result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); + k.HandleCommand(IncomingCommand.Empty); // release the continuation. + return true; + } + else + { + return false; + } } public abstract void _Private_BasicCancel(string consumerTag, bool nowait); @@ -844,12 +861,17 @@ public void BasicCancel(string consumerTag) { var k = new BasicConsumerRpcContinuation { m_consumerTag = consumerTag }; - lock (_rpcLock) + _rpcSemaphore.Wait(); + try { Enqueue(k); _Private_BasicCancel(consumerTag, false); k.GetReply(ContinuationTimeout); } + finally + { + _rpcSemaphore.Release(); + } } public void BasicCancelNoWait(string consumerTag) @@ -872,7 +894,8 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool var k = new BasicConsumerRpcContinuation { m_consumer = consumer }; - lock (_rpcLock) + _rpcSemaphore.Wait(); + try { Enqueue(k); // Non-nowait. We have an unconventional means of getting @@ -881,6 +904,11 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool /*nowait:*/ false, arguments); k.GetReply(ContinuationTimeout); } + finally + { + _rpcSemaphore.Release(); + } + string actualConsumerTag = k.m_consumerTag; return actualConsumerTag; @@ -889,12 +917,18 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool public BasicGetResult BasicGet(string queue, bool autoAck) { var k = new BasicGetRpcContinuation(); - lock (_rpcLock) + + _rpcSemaphore.Wait(); + try { Enqueue(k); _Private_BasicGet(queue, autoAck); k.GetReply(ContinuationTimeout); } + finally + { + _rpcSemaphore.Release(); + } return k.m_result; } @@ -982,12 +1016,17 @@ public void BasicRecover(bool requeue) { var k = new SimpleBlockingRpcContinuation(); - lock (_rpcLock) + _rpcSemaphore.Wait(); + try { Enqueue(k); _Private_BasicRecover(requeue); k.GetReply(ContinuationTimeout); } + finally + { + _rpcSemaphore.Release(); + } } public abstract void BasicRecoverAsync(bool requeue); @@ -1065,6 +1104,11 @@ public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, b return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); } + public ValueTask QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + { + return QueueDeclareAsync(queue, false, durable, exclusive, autoDelete, arguments); + } + public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments); @@ -1196,17 +1240,45 @@ await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Library, private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { var k = new QueueDeclareRpcContinuation(); - lock (_rpcLock) + + _rpcSemaphore.Wait(); + try { Enqueue(k); _Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments); k.GetReply(ContinuationTimeout); } + finally + { + _rpcSemaphore.Release(); + } + QueueDeclareOk result = k.m_result; CurrentQueue = result.QueueName; return result; } + private async ValueTask QueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + { + var k = new QueueDeclareAsyncRpcContinuation(); + await _rpcSemaphore.WaitAsync().ConfigureAwait(false); + try + { + Enqueue(k); + + + var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments); + await ModelSendAsync(method).ConfigureAwait(false); + + QueueDeclareOk result = await k; + CurrentQueue = result.QueueName; + return result; + } + finally + { + _rpcSemaphore.Release(); + } + } public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation { @@ -1228,5 +1300,29 @@ public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation { public QueueDeclareOk m_result; } + + public class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation + { + public override void HandleCommand(in IncomingCommand cmd) + { + try + { + var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span); + var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); + if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk) + { + _tcs.TrySetResult(result); + } + else + { + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); + } + } + finally + { + cmd.ReturnMethodBuffer(); + } + } + } } } diff --git a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs index 43990d26ec..1b722bb8a0 100644 --- a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs +++ b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs @@ -108,5 +108,24 @@ public IRpcContinuation Next() { return Interlocked.Exchange(ref _outstandingRpc, s_tmp); } + + ///Peek at the next waiting continuation. + /// + /// + /// It is an error to call this method when there are no + /// waiting continuations. + /// + /// + public bool TryPeek(out T continuation) where T : IRpcContinuation + { + if (_outstandingRpc is T result) + { + continuation = result; + return true; + } + + continuation = default; + return false; + } } } diff --git a/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs b/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs index 32cb344b8a..bb40f6be82 100644 --- a/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs +++ b/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs @@ -54,24 +54,29 @@ internal class ConnectionSecureOrTuneContinuation : AsyncRpcContinuation arguments); void QueueBindNoWait(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary arguments); RabbitMQ.Client.QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, System.Collections.Generic.IDictionary arguments); + System.Threading.Tasks.ValueTask QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, System.Collections.Generic.IDictionary arguments); void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, System.Collections.Generic.IDictionary arguments); RabbitMQ.Client.QueueDeclareOk QueueDeclarePassive(string queue); uint QueueDelete(string queue, bool ifUnused, bool ifEmpty); diff --git a/projects/Unit/TestQueueDeclare.cs b/projects/Unit/TestQueueDeclare.cs index 02ea7516a9..8896468527 100644 --- a/projects/Unit/TestQueueDeclare.cs +++ b/projects/Unit/TestQueueDeclare.cs @@ -32,7 +32,7 @@ using System; using System.Collections.Generic; using System.Threading; - +using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; @@ -44,18 +44,26 @@ public TestQueueDeclare(ITestOutputHelper output) : base(output) { } + [Fact] + public async void TestQueueDeclareAsync() + { + string q = GenerateQueueName(); + QueueDeclareOk result = await _channel.QueueDeclareAsync(q, false, false, false, null); + Assert.Equal(q, result.QueueName); + } + [Fact] [Trait("Category", "RequireSMP")] public void TestConcurrentQueueDeclare() { string q = GenerateQueueName(); - Random rnd = new Random(); + var rnd = new Random(); - List ts = new List(); + var ts = new List(); NotSupportedException nse = null; for (int i = 0; i < 256; i++) { - Thread t = new Thread(() => + var t = new Thread(() => { try { @@ -81,5 +89,39 @@ public void TestConcurrentQueueDeclare() Assert.Null(nse); _channel.QueueDelete(q); } + + [Fact] + [Trait("Category", "RequireSMP")] + public async void TestConcurrentQueueDeclareAsync() + { + string q = GenerateQueueName(); + var rnd = new Random(); + + var ts = new List(); + NotSupportedException nse = null; + for (int i = 0; i < 256; i++) + { + async Task f() + { + try + { + // sleep for a random amount of time to increase the chances + // of thread interleaving. MK. + await Task.Delay(rnd.Next(5, 50)); + QueueDeclareOk r = await _channel.QueueDeclareAsync(q, false, false, false, null); + } + catch (NotSupportedException e) + { + nse = e; + } + } + var t = Task.Run(f); + ts.Add(t); + } + + await Task.WhenAll(ts); + Assert.Null(nse); + _channel.QueueDelete(q); + } } }