Skip to content

Commit

Permalink
Add QueueDeclareAsync to IChannel and a test for it.
Browse files Browse the repository at this point in the history
Add QueuDeclareAsync. Not working yet.

First stab at implementing QueueDeclareAsync

Finish implementing QueueDeclareAsync

API Update

Replace RPC lock with SemaphoreSlim that is also used for full-async methods.

Add test of concurrent queue declarations.

Update projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Co-authored-by: Stefán Jökull Sigurðarson <[email protected]>

Update projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs

Co-authored-by: Stefán Jökull Sigurðarson <[email protected]>

PR review suggestions
  • Loading branch information
lukebakken committed May 11, 2023
1 parent 26e5657 commit 549f461
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 44 deletions.
15 changes: 15 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ string BasicConsume(
void BasicNack(ulong deliveryTag, bool multiple, bool requeue);

#nullable enable

/// <summary>
/// Publishes a message.
/// </summary>
Expand All @@ -198,6 +199,7 @@ string BasicConsume(
/// </remarks>
void BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Publishes a message.
/// </summary>
Expand All @@ -208,6 +210,7 @@ void BasicPublish<TProperties>(string exchange, string routingKey, in TPropertie
/// </remarks>
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Asynchronously publishes a message.
/// </summary>
Expand All @@ -218,6 +221,7 @@ void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, i
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Asynchronously publishes a message.
/// </summary>
Expand All @@ -228,6 +232,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

#nullable disable

/// <summary>
Expand Down Expand Up @@ -361,6 +366,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

/// <summary>
/// Asynchronously declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
/// <param name="queue">The name of the queue. Pass an empty string to make the server generate a name.</param>
/// <param name="durable">Should this queue will survive a broker restart?</param>
/// <param name="exclusive">Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.</param>
/// <param name="autoDelete">Should this queue be auto-deleted when its last consumer (if any) unsubscribes?</param>
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

/// <summary>
/// Declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
Expand Down
16 changes: 6 additions & 10 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,12 @@ public override void _Private_QueueBind(string queue, string exchange, string ro

public override void _Private_QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, IDictionary<string, object> arguments)
{
/*
* Note:
* Even though nowait is a parameter, ChannelSend must be used
*/
var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
if (nowait)
{
ChannelSend(method);
}
else
{
ChannelSend(method);
}
ChannelSend(method);
}

public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait)
Expand Down Expand Up @@ -382,8 +379,7 @@ protected override bool DispatchAsynchronous(in IncomingCommand cmd)
}
case ProtocolCommandId.QueueDeclareOk:
{
HandleQueueDeclareOk(in cmd);
return true;
return HandleQueueDeclareOk(in cmd);
}
default: return false;
}
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,14 @@ public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool
_connection.RecordQueue(new RecordedQueue(queue, queue.Length == 0, durable, exclusive, autoDelete, arguments));
}

public async ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
ThrowIfDisposed();
QueueDeclareOk result = await _innerChannel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments);
_connection.RecordQueue(new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments));
return result;
}

public QueueDeclareOk QueueDeclarePassive(string queue)
=> InnerChannel.QueueDeclarePassive(queue);

Expand Down
128 changes: 112 additions & 16 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Impl
{
Expand All @@ -57,7 +56,6 @@ internal abstract class ChannelBase : IChannel, IRecoverable
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

private readonly object _rpcLock = new object();
private readonly object _confirmLock = new object();
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();

Expand Down Expand Up @@ -328,7 +326,8 @@ private void HandleCommand(in IncomingCommand cmd)
{
if (!DispatchAsynchronous(in cmd)) // Was asynchronous. Already processed. No need to process further.
{
_continuationQueue.Next().HandleCommand(in cmd);
IRpcContinuation c = _continuationQueue.Next();
c.HandleCommand(in cmd);
}
}

Expand All @@ -337,12 +336,17 @@ protected void ChannelRpc<TMethod>(in TMethod method, ProtocolCommandId returnCo
{
var k = new SimpleBlockingRpcContinuation();
IncomingCommand reply;
lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);
}
finally
{
_rpcSemaphore.Release();
}

reply.ReturnMethodBuffer();

Expand All @@ -358,12 +362,17 @@ protected TReturn ChannelRpc<TMethod, TReturn>(in TMethod method, ProtocolComman
var k = new SimpleBlockingRpcContinuation();
IncomingCommand reply;

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);
}
finally
{
_rpcSemaphore.Release();
}

if (reply.CommandId != returnCommandId)
{
Expand Down Expand Up @@ -783,13 +792,21 @@ protected void HandleConnectionUnblocked()
Session.Connection.HandleConnectionUnblocked();
}

protected void HandleQueueDeclareOk(in IncomingCommand cmd)
protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
{
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
var k = (QueueDeclareRpcContinuation)_continuationQueue.Next();
k.m_result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
if (_continuationQueue.TryPeek<QueueDeclareRpcContinuation>(out var k))
{
_continuationQueue.Next();
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
k.m_result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
return true;
}
else
{
return false;
}
}

public abstract void _Private_BasicCancel(string consumerTag, bool nowait);
Expand Down Expand Up @@ -844,12 +861,17 @@ public void BasicCancel(string consumerTag)
{
var k = new BasicConsumerRpcContinuation { m_consumerTag = consumerTag };

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_BasicCancel(consumerTag, false);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}
}

public void BasicCancelNoWait(string consumerTag)
Expand All @@ -872,7 +894,8 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool

var k = new BasicConsumerRpcContinuation { m_consumer = consumer };

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
// Non-nowait. We have an unconventional means of getting
Expand All @@ -881,6 +904,11 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool
/*nowait:*/ false, arguments);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}

string actualConsumerTag = k.m_consumerTag;

return actualConsumerTag;
Expand All @@ -889,12 +917,18 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool
public BasicGetResult BasicGet(string queue, bool autoAck)
{
var k = new BasicGetRpcContinuation();
lock (_rpcLock)

_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_BasicGet(queue, autoAck);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}

return k.m_result;
}
Expand Down Expand Up @@ -982,12 +1016,17 @@ public void BasicRecover(bool requeue)
{
var k = new SimpleBlockingRpcContinuation();

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_BasicRecover(requeue);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}
}

public abstract void BasicRecoverAsync(bool requeue);
Expand Down Expand Up @@ -1065,6 +1104,11 @@ public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, b
return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments);
}

public ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
return QueueDeclareAsync(queue, false, durable, exclusive, autoDelete, arguments);
}

public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
_Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments);
Expand Down Expand Up @@ -1196,17 +1240,45 @@ await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Library,
private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
var k = new QueueDeclareRpcContinuation();
lock (_rpcLock)

_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}

QueueDeclareOk result = k.m_result;
CurrentQueue = result.QueueName;
return result;
}

private async ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
var k = new QueueDeclareAsyncRpcContinuation();
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
try
{
Enqueue(k);


var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
await ModelSendAsync(method).ConfigureAwait(false);

QueueDeclareOk result = await k;
CurrentQueue = result.QueueName;
return result;
}
finally
{
_rpcSemaphore.Release();
}
}

public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation
{
Expand All @@ -1228,5 +1300,29 @@ public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
{
public QueueDeclareOk m_result;
}

public class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation<QueueDeclareOk>
{
public override void HandleCommand(in IncomingCommand cmd)
{
try
{
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk)
{
_tcs.TrySetResult(result);
}
else
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}
}
finally
{
cmd.ReturnMethodBuffer();
}
}
}
}
}
19 changes: 19 additions & 0 deletions projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,24 @@ public IRpcContinuation Next()
{
return Interlocked.Exchange(ref _outstandingRpc, s_tmp);
}

///<summary>Peek at the next waiting continuation.</summary>
///<remarks>
///<para>
/// It is an error to call this method when there are no
/// waiting continuations.
///</para>
///</remarks>
public bool TryPeek<T>(out T continuation) where T : IRpcContinuation
{
if (_outstandingRpc is T result)
{
continuation = result;
return true;
}

continuation = default;
return false;
}
}
}
Loading

0 comments on commit 549f461

Please sign in to comment.