Skip to content

Commit

Permalink
Merge pull request #1700 from rabbitmq/lukebakken/combine-channel-and…
Browse files Browse the repository at this point in the history
…-channelbase

Integrate `Channel` into `ChannelBase`
  • Loading branch information
lukebakken authored Oct 18, 2024
2 parents fe3b262 + f2a6f8c commit ecd948c
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
{
internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase
{
internal AsyncConsumerDispatcher(ChannelBase channel, ushort concurrency)
internal AsyncConsumerDispatcher(Channel channel, ushort concurrency)
: base(channel, concurrency)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,35 @@

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.ConsumerDispatching
{
internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, IConsumerDispatcher
{
protected readonly ChannelBase _channel;
protected readonly ChannelReader<WorkStruct> _reader;
private readonly ChannelWriter<WorkStruct> _writer;
protected readonly Impl.Channel _channel;
protected readonly System.Threading.Channels.ChannelReader<WorkStruct> _reader;
private readonly System.Threading.Channels.ChannelWriter<WorkStruct> _writer;
private readonly Task _worker;
private readonly ushort _concurrency;
private bool _quiesce = false;
private bool _disposed;

internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
{
_channel = channel;
_concurrency = concurrency;
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions

var channelOpts = new System.Threading.Channels.UnboundedChannelOptions
{
SingleReader = _concurrency == 1,
SingleWriter = false,
AllowSynchronousContinuations = false
});
};

var workChannel = System.Threading.Channels.Channel.CreateUnbounded<WorkStruct>(channelOpts);
_reader = workChannel.Reader;
_writer = workChannel.Writer;

Expand Down
155 changes: 0 additions & 155 deletions projects/RabbitMQ.Client/Framing/Channel.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

namespace RabbitMQ.Client.Impl
{
internal abstract class ChannelBase : IChannel, IRecoverable
internal class Channel : IChannel, IRecoverable
{
///<summary>Only used to kick-start a connection open
///sequence. See <see cref="Connection.OpenAsync"/> </summary>
Expand All @@ -71,7 +71,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null)
public Channel(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
Expand All @@ -92,6 +92,7 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChan
}

internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public TimeSpan ContinuationTimeout { get; set; }

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -192,7 +193,7 @@ public void MaybeSetConnectionStartException(Exception ex)
}
}

protected void TakeOver(ChannelBase other)
protected void TakeOver(Channel other)
{
_basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper);
_basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper);
Expand Down Expand Up @@ -355,8 +356,6 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

protected abstract Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken);

protected bool Enqueue(IRpcContinuation k)
{
if (IsOpen)
Expand Down Expand Up @@ -873,14 +872,26 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
}
}

public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
CancellationToken cancellationToken);
public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
CancellationToken cancellationToken)
{
var method = new BasicAck(deliveryTag, multiple);
return ModelSendAsync(in method, cancellationToken);
}

public abstract ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue,
CancellationToken cancellationToken);
public virtual ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue,
CancellationToken cancellationToken)
{
var method = new BasicNack(deliveryTag, multiple, requeue);
return ModelSendAsync(in method, cancellationToken);
}

public abstract ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
CancellationToken cancellationToken);
public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
CancellationToken cancellationToken)
{
var method = new BasicReject(deliveryTag, requeue);
return ModelSendAsync(in method, cancellationToken);
}

public async Task BasicCancelAsync(string consumerTag, bool noWait,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -1881,5 +1892,93 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
}
}
}

/// <summary>
/// Returning <c>true</c> from this method means that the command was server-originated,
/// and handled already.
/// Returning <c>false</c> (the default) means that the incoming command is the response to
/// a client-initiated RPC call, and must be handled.
/// </summary>
/// <param name="cmd">The incoming command from the AMQP server</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns></returns>
private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
switch (cmd.CommandId)
{
case ProtocolCommandId.BasicCancel:
{
// Note: always returns true
return HandleBasicCancelAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicDeliver:
{
// Note: always returns true
return HandleBasicDeliverAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicAck:
{
return HandleBasicAck(cmd, cancellationToken);
}
case ProtocolCommandId.BasicNack:
{
return HandleBasicNack(cmd, cancellationToken);
}
case ProtocolCommandId.BasicReturn:
{
// Note: always returns true
return HandleBasicReturn(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelClose:
{
// Note: always returns true
return HandleChannelCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelCloseOk:
{
// Note: always returns true
return HandleChannelCloseOkAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelFlow:
{
// Note: always returns true
return HandleChannelFlowAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionBlocked:
{
// Note: always returns true
return HandleConnectionBlockedAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionClose:
{
// Note: always returns true
return HandleConnectionCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionSecure:
{
// Note: always returns true
return HandleConnectionSecureAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionStart:
{
// Note: always returns true
return HandleConnectionStartAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionTune:
{
// Note: always returns true
return HandleConnectionTuneAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionUnblocked:
{
// Note: always returns true
return HandleConnectionUnblockedAsync(cancellationToken);
}
default:
{
return Task.FromResult(false);
}
}
}
}
}
Loading

0 comments on commit ecd948c

Please sign in to comment.