Skip to content

Commit

Permalink
* Ensure BasicPublishAsync API and extensions use the same paramete…
Browse files Browse the repository at this point in the history
…r order as the 6.x version (cc @danielmarbach)
  • Loading branch information
lukebakken committed Sep 6, 2024
1 parent fd0f5bd commit 5abd912
Show file tree
Hide file tree
Showing 22 changed files with 118 additions and 64 deletions.
10 changes: 6 additions & 4 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,6 @@ RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System.
RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
Expand Down Expand Up @@ -859,8 +857,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary<string!, object?>? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel! channel, ushort replyCode, string! replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
Expand Down Expand Up @@ -895,4 +891,10 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel
RabbitMQ.Client.ICredentialsProvider.Name.get -> string!
RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<byte[]!>!
readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider!
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
12 changes: 6 additions & 6 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
/// </summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties,
ReadOnlyMemory<byte> body = default, bool mandatory = false,
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

Expand All @@ -215,15 +215,15 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TPr
/// </summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties,
ReadOnlyMemory<byte> body = default, bool mandatory = false,
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

Expand Down
62 changes: 52 additions & 10 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public static Task<string> BasicConsumeAsync(this IChannel channel,
cancellationToken);

/// <summary>
/// (Extension method) Convenience overload of BasicPublish.
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
/// </summary>
/// <remarks>
/// The publication occurs with mandatory=false and immediate=false.
/// The publication occurs with mandatory=false.
/// </remarks>
public static ValueTask BasicPublishAsync<T>(this IChannel channel,
PublicationAddress addr,
Expand All @@ -85,28 +85,70 @@ public static ValueTask BasicPublishAsync<T>(this IChannel channel,
CancellationToken cancellationToken = default)
where T : IReadOnlyBasicProperties, IAmqpHeader =>
channel.BasicPublishAsync(exchange: addr.ExchangeName, routingKey: addr.RoutingKey,
basicProperties: basicProperties, body: body, mandatory: false,
mandatory: false, basicProperties: basicProperties, body: body,
cancellationToken);

/// <summary>
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
/// </summary>
/// <remarks>
/// The publication occurs with mandatory=false and empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
string exchange,
string routingKey,
ReadOnlyMemory<byte> body = default,
bool mandatory = false,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default) =>
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
basicProperties: EmptyBasicProperty.Empty, body: body, mandatory: mandatory,
mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body,
cancellationToken);

/// <summary>
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(CachedString, CachedString, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)" />
/// </summary>
/// <remarks>
/// The publication occurs with mandatory=false and empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
CachedString exchange,
CachedString routingKey,
ReadOnlyMemory<byte> body = default,
bool mandatory = false,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default) =>
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
basicProperties: EmptyBasicProperty.Empty, body: body, mandatory: mandatory,
cancellationToken);
mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body,
cancellationToken);

/// <summary>
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
/// </summary>
/// <remarks>
/// The publication occurs with empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
string exchange,
string routingKey,
bool mandatory,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default) =>
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body,
cancellationToken);

/// <summary>
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(CachedString, CachedString, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)" />
/// </summary>
/// <remarks>
/// The publication occurs with empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
CachedString exchange,
CachedString routingKey,
bool mandatory,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default) =>
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body,
cancellationToken);

/// <summary>
/// Asynchronously declare a queue.
Expand Down
20 changes: 12 additions & 8 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,21 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
public ValueTask<BasicGetResult?> BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken)
=> InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken);

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties,
ReadOnlyMemory<byte> body, bool mandatory,
CancellationToken cancellationToken)
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory, cancellationToken);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties,
ReadOnlyMemory<byte> body, bool mandatory,
CancellationToken cancellationToken)
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory, cancellationToken);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);

public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
CancellationToken cancellationToken)
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,8 @@ await ModelSendAsync(method, k.CancellationToken)
}

public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory,
CancellationToken cancellationToken)
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (ConfirmsAreEnabled)
Expand Down Expand Up @@ -1004,8 +1004,8 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
}

public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory,
CancellationToken cancellationToken)
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (ConfirmsAreEnabled)
Expand Down
3 changes: 2 additions & 1 deletion projects/Test/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
{
using var channel = await connection.CreateChannelAsync(); // New channel for each message
await Task.Delay(1000);
await channel.BasicPublishAsync(string.Empty, string.Empty, props, msg);
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: msg);
Console.WriteLine($"Sent message {i}");
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task TestExchangeToExchangeBindingRecovery()
{
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"), mandatory: true);
await _channel.BasicPublishAsync(ex_source, "", body: _encoding.GetBytes("msg"), mandatory: true);
await _channel.WaitForConfirmsOrDieAsync();
await AssertMessageCountAsync(q, 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public async Task TestPublishRpcRightAfterReconnect()

try
{
await _channel.BasicPublishAsync(string.Empty, testQueueName, properties, _messageBody);
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName,
mandatory: false, basicProperties: properties, body: _messageBody);
}
catch (Exception e)
{
Expand Down
Loading

0 comments on commit 5abd912

Please sign in to comment.