Skip to content

Commit

Permalink
improve autorecovering eventing
Browse files Browse the repository at this point in the history
align CloseReason implementation
  • Loading branch information
bollhals committed Jan 18, 2021
1 parent 1d02cca commit 779d487
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 465 deletions.
2 changes: 2 additions & 0 deletions projects/Benchmarks/Eventing/Eventing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public static IEnumerable<object> CountSource()

public class Eventing_AddRemove : EventingBase
{
#pragma warning disable 67 // Required for add / remove
private event EventHandler<ulong> _event;
#pragma warning restore 67

[Benchmark(Baseline = true)]
[ArgumentsSource(nameof(CountSource))]
Expand Down
131 changes: 15 additions & 116 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal sealed class AutorecoveringConnection : IConnection
{
private bool _disposed = false;
private readonly object _eventLock = new object();

private bool _disposed;
private Connection _delegate;
private readonly ConnectionFactory _factory;

Expand All @@ -59,19 +57,12 @@ internal sealed class AutorecoveringConnection : IConnection
private readonly object _recordedEntitiesLock = new object();

private readonly Dictionary<string, RecordedExchange> _recordedExchanges = new Dictionary<string, RecordedExchange>();

private readonly Dictionary<string, RecordedQueue> _recordedQueues = new Dictionary<string, RecordedQueue>();

private readonly Dictionary<RecordedBinding, byte> _recordedBindings = new Dictionary<RecordedBinding, byte>();

private readonly Dictionary<string, RecordedConsumer> _recordedConsumers = new Dictionary<string, RecordedConsumer>();

private readonly List<AutorecoveringModel> _models = new List<AutorecoveringModel>();

private EventHandler<ConnectionBlockedEventArgs> _recordedBlockedEventHandlers;
private EventHandler<ShutdownEventArgs> _recordedShutdownEventHandlers;
private EventHandler<EventArgs> _recordedUnblockedEventHandlers;

public AutorecoveringConnection(ConnectionFactory factory, string clientProvidedName = null)
{
_factory = factory;
Expand Down Expand Up @@ -102,88 +93,26 @@ public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryEr

public event EventHandler<CallbackExceptionEventArgs> CallbackException
{
add
{
ThrowIfDisposed();
lock (_eventLock)
{
_delegate.CallbackException += value;
}
}
remove
{
ThrowIfDisposed();
lock (_eventLock)
{
_delegate.CallbackException -= value;
}
}
add => Delegate.CallbackException += value;
remove => Delegate.CallbackException -= value;
}

public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked
{
add
{
ThrowIfDisposed();
lock (_eventLock)
{
_recordedBlockedEventHandlers += value;
_delegate.ConnectionBlocked += value;
}
}
remove
{
ThrowIfDisposed();
lock (_eventLock)
{
_recordedBlockedEventHandlers -= value;
_delegate.ConnectionBlocked -= value;
}
}
add => Delegate.ConnectionBlocked += value;
remove => Delegate.ConnectionBlocked -= value;
}

public event EventHandler<ShutdownEventArgs> ConnectionShutdown
{
add
{
ThrowIfDisposed();
lock (_eventLock)
{
_recordedShutdownEventHandlers += value;
_delegate.ConnectionShutdown += value;
}
}
remove
{
ThrowIfDisposed();
lock (_eventLock)
{
_recordedShutdownEventHandlers -= value;
_delegate.ConnectionShutdown -= value;
}
}
add => Delegate.ConnectionShutdown += value;
remove => Delegate.ConnectionShutdown -= value;
}

public event EventHandler<EventArgs> ConnectionUnblocked
{
add
{
ThrowIfDisposed();
lock (_eventLock)
{
_recordedUnblockedEventHandlers += value;
_delegate.ConnectionUnblocked += value;
}
}
remove
{
ThrowIfDisposed();
lock (_eventLock)
{
_recordedUnblockedEventHandlers -= value;
_delegate.ConnectionUnblocked -= value;
}
}
add => Delegate.ConnectionUnblocked += value;
remove => Delegate.ConnectionUnblocked -= value;
}

public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
Expand All @@ -204,8 +133,6 @@ public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChang

public ushort ChannelMax => Delegate.ChannelMax;

public ConsumerWorkService ConsumerWorkService => Delegate.ConsumerWorkService;

public IDictionary<string, object> ClientProperties => Delegate.ClientProperties;

public ShutdownEventArgs CloseReason => Delegate.CloseReason;
Expand Down Expand Up @@ -244,10 +171,6 @@ private bool TryPerformAutomaticRecovery()
{
if (TryRecoverConnectionDelegate())
{
RecoverConnectionShutdownHandlers();
RecoverConnectionBlockedHandlers();
RecoverConnectionUnblockedHandlers();

RecoverModels();
if (_factory.TopologyRecoveryEnabled)
{
Expand All @@ -261,10 +184,8 @@ private bool TryPerformAutomaticRecovery()

return true;
}
else
{
ESLog.Warn("Connection delegate was manually closed. Aborted recovery.");
}

ESLog.Warn("Connection delegate was manually closed. Aborted recovery.");
}
catch (Exception e)
{
Expand Down Expand Up @@ -446,23 +367,16 @@ public void Init(IEndpointResolver endpoints)
private void Init(IFrameHandler fh)
{
ThrowIfDisposed();
_delegate = new Connection(_factory, false,
fh, ClientProvidedName);

_delegate = new Connection(_factory, false, fh, ClientProvidedName);
_recoveryTask = Task.Run(MainRecoveryLoop);

EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
ConnectionShutdown += (_, args) =>
{
if (ShouldTriggerConnectionRecovery(args))
{
_recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.BeginAutomaticRecovery);
}
};
lock (_eventLock)
{
ConnectionShutdown += recoveryListener;
_recordedShutdownEventHandlers += recoveryListener;
}
}

///<summary>API-side invocation of updating the secret.</summary>
Expand Down Expand Up @@ -622,10 +536,6 @@ private void Dispose(bool disposing)
{
_models.Clear();
_delegate = null;
_recordedBlockedEventHandlers = null;
_recordedShutdownEventHandlers = null;
_recordedUnblockedEventHandlers = null;

_disposed = true;
}
}
Expand Down Expand Up @@ -686,22 +596,15 @@ private void RecoverBindings()
}
}

private void RecoverConnectionBlockedHandlers()
{
ThrowIfDisposed();
lock (_eventLock)
{
_delegate.ConnectionBlocked += _recordedBlockedEventHandlers;
}
}

private bool TryRecoverConnectionDelegate()
{
ThrowIfDisposed();
try
{
IFrameHandler fh = _endpoints.SelectOne(_factory.CreateFrameHandler);
var defunctConnection = _delegate;
_delegate = new Connection(_factory, false, fh, ClientProvidedName);
_delegate.TakeOver(defunctConnection);
return true;
}
catch (Exception e)
Expand All @@ -717,10 +620,6 @@ private bool TryRecoverConnectionDelegate()
return false;
}

private void RecoverConnectionShutdownHandlers() => Delegate.ConnectionShutdown += _recordedShutdownEventHandlers;

private void RecoverConnectionUnblockedHandlers() => Delegate.ConnectionUnblocked += _recordedUnblockedEventHandlers;

private void RecoverConsumers()
{
ThrowIfDisposed();
Expand Down
Loading

0 comments on commit 779d487

Please sign in to comment.