diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index 3561bcac7..cac3b3cbe 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.IO.Pipelines; using System.Net.Sockets; using System.Runtime.CompilerServices; @@ -29,6 +30,8 @@ internal sealed class CommandWriter : IAsyncDisposable private const int MinSegmentSize = 16384; private readonly ILogger _logger; + private readonly bool _trace; + private readonly string _name; private readonly NatsConnection _connection; private readonly ObjectPool _pool; private readonly int _arrayPoolInitialSize; @@ -52,9 +55,11 @@ internal sealed class CommandWriter : IAsyncDisposable private CancellationTokenSource? _ctsReader; private volatile bool _disposed; - public CommandWriter(NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) + public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) { _logger = opts.LoggerFactory.CreateLogger(); + _trace = _logger.IsEnabled(LogLevel.Trace); + _name = name; _connection = connection; _pool = pool; @@ -78,10 +83,14 @@ public CommandWriter(NatsConnection connection, ObjectPool pool, NatsOpts opts, useSynchronizationContext: false)); _pipeReader = pipe.Reader; _pipeWriter = pipe.Writer; + + _logger.LogDebug(NatsLogEvents.Buffer, "Created {Name}", _name); } public void Reset(ISocketConnection socketConnection) { + _logger.LogDebug(NatsLogEvents.Buffer, "Resetting {Name}", _name); + lock (_lock) { _socketConnection = socketConnection; @@ -104,6 +113,8 @@ await ReaderLoopAsync( public async Task CancelReaderLoopAsync() { + _logger.LogDebug(NatsLogEvents.Buffer, "Canceling reader loop"); + CancellationTokenSource? cts; Task? readerTask; lock (_lock) @@ -130,10 +141,14 @@ public async Task CancelReaderLoopAsync() // closed by the time we get here or soon after. await readerTask.WaitAsync(_cts.Token).ConfigureAwait(false); } + + _logger.LogDebug(NatsLogEvents.Buffer, "Cancelled reader loop successfully"); } public async ValueTask DisposeAsync() { + _logger.LogDebug(NatsLogEvents.Buffer, "Disposing {Name}", _name); + if (_disposed) { return; @@ -163,6 +178,11 @@ public async ValueTask DisposeAsync() public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "CONNECT"); + } + #pragma warning disable CA2016 #pragma warning disable VSTHRD103 if (!_semLock.Wait(0)) @@ -197,6 +217,11 @@ public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancella public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellationToken) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "PING"); + } + #pragma warning disable CA2016 #pragma warning disable VSTHRD103 if (!_semLock.Wait(0)) @@ -232,6 +257,11 @@ public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellati public ValueTask PongAsync(CancellationToken cancellationToken = default) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "PONG"); + } + #pragma warning disable CA2016 #pragma warning disable VSTHRD103 if (!_semLock.Wait(0)) @@ -266,6 +296,11 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default) public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "PUB {Subject} {ReplyTo}", subject, replyTo); + } + NatsPooledBufferWriter? headersBuffer = null; if (headers != null) { @@ -348,6 +383,11 @@ public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "SUB {Subject} {QueueGroup} {MaxMsgs}", subject, queueGroup, maxMsgs); + } + #pragma warning disable CA2016 #pragma warning disable VSTHRD103 if (!_semLock.Wait(0)) @@ -382,6 +422,11 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cancellationToken) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "UNSUB {Sid} {MaxMsgs}", sid, maxMsgs); + } + #pragma warning disable CA2016 #pragma warning disable VSTHRD103 if (!_semLock.Wait(0)) @@ -445,6 +490,10 @@ private static async Task ReaderLoopAsync( { try { + var trace = logger.IsEnabled(LogLevel.Trace); + logger.LogDebug(NatsLogEvents.Buffer, "Starting send buffer reader loop"); + + var stopwatch = Stopwatch.StartNew(); var examinedOffset = 0; var pending = 0; while (true) @@ -481,7 +530,18 @@ private static async Task ReaderLoopAsync( Exception? sendEx = null; try { + if (trace) + { + stopwatch.Restart(); + } + sent = await connection.SendAsync(sendMem).ConfigureAwait(false); + + if (trace) + { + stopwatch.Stop(); + logger.LogTrace(NatsLogEvents.Buffer, "Socket.SendAsync Size: {Sent}/{Size} Elapsed: {ElapsedMs}ms", sent, sendMem.Length, stopwatch.Elapsed.TotalMilliseconds); + } } catch (Exception ex) { @@ -589,10 +649,12 @@ private static async Task ReaderLoopAsync( catch (OperationCanceledException) { // Expected during shutdown + logger.LogDebug(NatsLogEvents.Buffer, "Operation canceled in send buffer reader loop (expected during shutdown)"); } catch (ObjectDisposedException) { // Expected during shutdown + logger.LogDebug(NatsLogEvents.Buffer, "Object disposed in send buffer reader loop (expected during shutdown)"); } catch (SocketException e) { @@ -613,6 +675,8 @@ private static async Task ReaderLoopAsync( { logger.LogError(NatsLogEvents.Buffer, e, "Unexpected error in send buffer reader loop"); } + + logger.LogDebug(NatsLogEvents.Buffer, "Exiting send buffer reader loop"); } /// diff --git a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs index f6c70427f..7effc9d49 100644 --- a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs +++ b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs @@ -8,7 +8,7 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing) { - CommandWriter = new CommandWriter(connection, pool, opts, counter, enqueuePing, overrideCommandTimeout: Timeout.InfiniteTimeSpan); + CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing, overrideCommandTimeout: Timeout.InfiniteTimeSpan); CommandWriter.Reset(socketConnection); } diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index 096ad83ee..c0d426d15 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -163,6 +163,10 @@ private async Task ReadLoopAsync() { // https://docs.nats.io/reference/reference-protocols/nats-protocol#msg // MSG [reply-to] <#bytes>\r\n[payload] + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "MSG"); + } // Try to find before \n var positionBeforePayload = buffer.PositionOf((byte)'\n'); @@ -220,6 +224,10 @@ private async Task ReadLoopAsync() { // https://docs.nats.io/reference/reference-protocols/nats-protocol#hmsg // HMSG [reply-to] <#header bytes> <#total bytes>\r\n[headers]\r\n\r\n[payload]\r\n + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "HMSG"); + } // Find the end of 'HMSG' first message line var positionBeforeNatsHeader = buffer.PositionOf((byte)'\n'); @@ -309,6 +317,11 @@ private async ValueTask> DispatchCommandAsync(int code, R if (code == ServerOpCodes.Ping) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "PING"); + } + const int PingSize = 6; // PING\r\n await _connection.PongAsync().ConfigureAwait(false); // return pong to server @@ -324,6 +337,11 @@ private async ValueTask> DispatchCommandAsync(int code, R } else if (code == ServerOpCodes.Pong) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "PONG"); + } + const int PongSize = 6; // PONG\r\n _connection.ResetPongCount(); // reset count for PingTimer @@ -345,6 +363,11 @@ private async ValueTask> DispatchCommandAsync(int code, R } else if (code == ServerOpCodes.Error) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "ERR"); + } + // try to get \n. var position = buffer.PositionOf((byte)'\n'); if (position == null) @@ -367,6 +390,11 @@ private async ValueTask> DispatchCommandAsync(int code, R } else if (code == ServerOpCodes.Ok) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "OK"); + } + const int OkSize = 5; // +OK\r\n if (length < OkSize) @@ -380,6 +408,11 @@ private async ValueTask> DispatchCommandAsync(int code, R } else if (code == ServerOpCodes.Info) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Protocol, "INFO"); + } + // try to get \n. var position = buffer.PositionOf((byte)'\n'); diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index 7e285d1a5..e422d7d03 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -18,6 +18,7 @@ internal sealed record SubscriptionMetadata(int Sid); internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable { private readonly ILogger _logger; + private readonly bool _trace; private readonly bool _debug; private readonly object _gate = new(); private readonly NatsConnection _connection; @@ -39,6 +40,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix) _inboxPrefix = inboxPrefix; _logger = _connection.Opts.LoggerFactory.CreateLogger(); _debug = _logger.IsEnabled(LogLevel.Debug); + _trace = _logger.IsEnabled(LogLevel.Trace); _cts = new CancellationTokenSource(); _cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval; _timer = Task.Run(CleanupAsync); @@ -90,6 +92,11 @@ public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationT public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Subscription, "Received subscription data for {Subject}/{Sid}", subject, sid); + } + int? orphanSid = null; lock (_gate) { @@ -97,6 +104,11 @@ public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, i { if (sidMetadata.WeakReference.TryGetTarget(out var sub)) { + if (_trace) + { + _logger.LogTrace(NatsLogEvents.Subscription, "Found subscription handler for {Subject}/{Sid}", subject, sid); + } + return sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer); } else @@ -182,6 +194,11 @@ public ValueTask RemoveAsync(NatsSubBase sub) /// Enumerable list of commands public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter) { + if (_debug) + { + _logger.LogDebug(NatsLogEvents.Subscription, "Reconnect commands requested"); + } + var subs = new List<(NatsSubBase, int)>(); lock (_gate) { @@ -191,12 +208,21 @@ public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter) { subs.Add((sub, sid)); } + else + { + _logger.LogError(NatsLogEvents.Subscription, "While reconnecting found subscription GCd but was never disposed {SidMetadataSubject}/{Sid}", sidMetadata.Subject, sid); + } } } foreach (var (sub, sid) in subs) { await sub.WriteReconnectCommandsAsync(commandWriter, sid).ConfigureAwait(false); + + if (_debug) + { + _logger.LogDebug(NatsLogEvents.Subscription, "Wrote reconnect commands for subscription {Subject}", sub.Subject); + } } } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index fbdd77b62..93d2ab021 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -52,6 +52,7 @@ public partial class NatsConnection : INatsConnection private int _pongCount; private int _connectionState; private int _isDisposed; + private int _reconnectCount; // when reconnected, make new instance. private ISocketConnection? _socket; @@ -73,6 +74,7 @@ public NatsConnection() public NatsConnection(NatsOpts opts) { + _logger = opts.LoggerFactory.CreateLogger(); Opts = opts; ConnectionState = NatsConnectionState.Closed; _waitForOpenConnection = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -80,10 +82,9 @@ public NatsConnection(NatsOpts opts) _pool = new ObjectPool(opts.ObjectPoolSize); _name = opts.Name; Counter = new ConnectionStatsCounter(); - CommandWriter = new CommandWriter(this, _pool, Opts, Counter, EnqueuePing); + CommandWriter = new CommandWriter("main", this, _pool, Opts, Counter, EnqueuePing); InboxPrefix = NewInbox(opts.InboxPrefix); SubscriptionManager = new SubscriptionManager(this, InboxPrefix); - _logger = opts.LoggerFactory.CreateLogger(); _clientOpts = ClientOpts.Create(Opts); HeaderParser = new NatsHeaderParser(opts.HeaderEncoding); _defaultSubscriptionChannelOpts = new BoundedChannelOptions(opts.SubPendingChannelCapacity) @@ -119,8 +120,12 @@ public NatsConnection(NatsOpts opts) public NatsConnectionState ConnectionState { - get => (NatsConnectionState)Volatile.Read(ref _connectionState); - private set => Interlocked.Exchange(ref _connectionState, (int)value); + get => (NatsConnectionState)Interlocked.CompareExchange(ref _connectionState, 0, 0); + private set + { + _logger.LogDebug(NatsLogEvents.Connection, "Connection state is changing from {OldState} to {NewState}", ConnectionState, value); + Interlocked.Exchange(ref _connectionState, (int)value); + } } public INatsServerInfo? ServerInfo => WritableServerInfo; // server info is set when received INFO @@ -388,6 +393,8 @@ private async ValueTask InitialConnectAsync() private async ValueTask SetupReaderWriterAsync(bool reconnect) { + _logger.LogDebug(NatsLogEvents.Connection, "Setup reader and writer"); + if (_currentConnectUri!.IsSeed) _lastSeedConnectUri = _currentConnectUri; @@ -508,18 +515,28 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) await DisposeSocketAsync(true).ConfigureAwait(false); throw; } + + _logger.LogDebug(NatsLogEvents.Connection, "Setup reader and writer completed successfully"); } private async void ReconnectLoop() { + var reconnectCount = Interlocked.Increment(ref _reconnectCount); + _logger.LogDebug(NatsLogEvents.Connection, "Reconnect loop started [{ReconnectCount}]", reconnectCount); + var debug = _logger.IsEnabled(LogLevel.Debug); + var stopwatch = Stopwatch.StartNew(); + try { // If dispose this client, WaitForClosed throws OperationCanceledException so stop reconnect-loop correctly. await _socket!.WaitForClosed.ConfigureAwait(false); + _logger.LogDebug(NatsLogEvents.Connection, "Reconnect loop connection closed [{ReconnectCount}]", reconnectCount); + await CommandWriter.CancelReaderLoopAsync().ConfigureAwait(false); - _logger.LogTrace(NatsLogEvents.Connection, "Connection {Name} is closed. Will cleanup and reconnect", _name); + _logger.LogDebug(NatsLogEvents.Connection, "Connection {Name} is closed. Will cleanup and reconnect [{ReconnectCount}]", _name, reconnectCount); + lock (_gate) { ConnectionState = NatsConnectionState.Reconnecting; @@ -555,11 +572,14 @@ private async void ReconnectLoop() NatsUri? url = null; CONNECT_AGAIN: + _logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect [{ReconnectCount}]", reconnectCount); + if (IsDisposed) { // No point in trying to reconnect. - // This can happen if the we're disposed while we're waiting for the next reconnect + // This can happen if we're disposed while we're waiting for the next reconnect // and potentially gets us stuck in a reconnect loop. + _logger.LogDebug(NatsLogEvents.Connection, "Disposed, no point in trying to reconnect [{ReconnectCount}]", reconnectCount); return; } @@ -572,7 +592,7 @@ private async void ReconnectLoop() if (OnConnectingAsync != null) { var target = (url.Host, url.Port); - _logger.LogInformation(NatsLogEvents.Connection, "Try to invoke OnConnectingAsync before connect to NATS"); + _logger.LogInformation(NatsLogEvents.Connection, "Try to invoke OnConnectingAsync before connect to NATS [{ReconnectCount}]", reconnectCount); var newTarget = await OnConnectingAsync(target).ConfigureAwait(false); if (newTarget.Host != target.Host || newTarget.Port != target.Port) @@ -581,15 +601,17 @@ private async void ReconnectLoop() } } - _logger.LogInformation(NatsLogEvents.Connection, "Tried to connect NATS {Url}", url); + _logger.LogInformation(NatsLogEvents.Connection, "Tried to connect NATS {Url} [{ReconnectCount}]", url, reconnectCount); if (url.IsWebSocket) { + _logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect using WebSocket {Url} [{ReconnectCount}]", url, reconnectCount); var conn = new WebSocketConnection(); await conn.ConnectAsync(url.Uri, Opts.ConnectTimeout).ConfigureAwait(false); _socket = conn; } else { + _logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect using TCP {Url} [{ReconnectCount}]", url, reconnectCount); var conn = new TcpConnection(_logger); await conn.ConnectAsync(url.Host, url.Port, Opts.ConnectTimeout).ConfigureAwait(false); _socket = conn; @@ -597,6 +619,7 @@ private async void ReconnectLoop() if (Opts.TlsOpts.EffectiveMode(url) == TlsMode.Implicit) { // upgrade TcpConnection to SslConnection + _logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect and upgrading to TLS {Url} [{ReconnectCount}]", url, reconnectCount); var sslConnection = conn.UpgradeToSslStreamConnection(Opts.TlsOpts); await sslConnection.AuthenticateAsClientAsync(FixTlsHost(url), Opts.ConnectTimeout).ConfigureAwait(false); _socket = sslConnection; @@ -607,6 +630,7 @@ private async void ReconnectLoop() } else { + _logger.LogDebug(NatsLogEvents.Connection, "Reconnect URLs exhausted, retrying from the beginning [{ReconnectCount}]", reconnectCount); urlEnumerator.Dispose(); urlEnumerator = urls.AsEnumerable().GetEnumerator(); goto CONNECT_AGAIN; @@ -622,13 +646,24 @@ private async void ReconnectLoop() return; } - if (url != null) + _logger.LogWarning(NatsLogEvents.Connection, ex, "Failed to connect NATS {Url} [{ReconnectCount}]", url, reconnectCount); + + _eventChannel.Writer.TryWrite((NatsEvent.ReconnectFailed, new NatsEventArgs(url?.ToString() ?? string.Empty))); + + if (debug) { - _logger.LogWarning(NatsLogEvents.Connection, ex, "Failed to connect NATS {Url}", url); + stopwatch.Restart(); + _logger.LogDebug(NatsLogEvents.Connection, "Reconnect wait with jitter [{ReconnectCount}]", reconnectCount); } - _eventChannel.Writer.TryWrite((NatsEvent.ReconnectFailed, new NatsEventArgs(url?.ToString() ?? string.Empty))); await WaitWithJitterAsync().ConfigureAwait(false); + + if (debug) + { + stopwatch.Stop(); + _logger.LogDebug(NatsLogEvents.Connection, "Reconnect wait over after {WaitMs}ms [{ReconnectCount}]", stopwatch.ElapsedMilliseconds, reconnectCount); + } + goto CONNECT_AGAIN; } @@ -636,7 +671,7 @@ private async void ReconnectLoop() { _connectRetry = 0; _backoff = TimeSpan.Zero; - _logger.LogInformation(NatsLogEvents.Connection, "Connection succeeded {Name}, NATS {Url}", _name, url); + _logger.LogInformation(NatsLogEvents.Connection, "Connection succeeded {Name}, NATS {Url} [{ReconnectCount}]", _name, url, reconnectCount); ConnectionState = NatsConnectionState.Open; _pingTimerCancellationTokenSource = new CancellationTokenSource(); StartPingTimer(_pingTimerCancellationTokenSource.Token); @@ -648,14 +683,26 @@ private async void ReconnectLoop() catch (Exception ex) { if (ex is OperationCanceledException) + { + try + { + _logger.LogDebug(NatsLogEvents.Connection, "Operation cancelled. Retry loop stopped because the connection was disposed [{ReconnectCount}]", reconnectCount); + } + catch + { + // ignore logging exceptions in case our host might be disposed or shutting down + } + return; + } + _waitForOpenConnection.TrySetException(ex); try { if (!IsDisposed) { // Only log if we're not disposing, otherwise we might log exceptions that are expected - _logger.LogError(NatsLogEvents.Connection, ex, "Retry loop stopped and connection state is invalid"); + _logger.LogError(NatsLogEvents.Connection, ex, "Retry loop stopped and connection state is invalid [{ReconnectCount}]", reconnectCount); } } catch @@ -666,6 +713,15 @@ private async void ReconnectLoop() // (e.g. we've seen this with EventLog provider on Windows) } } + + try + { + _logger.LogDebug(NatsLogEvents.Connection, "Reconnect loop stopped [{ReconnectCount}]", reconnectCount); + } + catch + { + // ignore logging exceptions in case our host might be disposed or shutting down + } } private async Task PublishEventsAsync() @@ -790,6 +846,8 @@ private async void StartPingTimer(CancellationToken cancellationToken) if (Opts.PingInterval == TimeSpan.Zero) return; + _logger.LogDebug(NatsLogEvents.Connection, "Starting ping timer"); + var periodicTimer = new PeriodicTimer(Opts.PingInterval); ResetPongCount(); try @@ -810,8 +868,28 @@ private async void StartPingTimer(CancellationToken cancellationToken) await periodicTimer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false); } } + catch (Exception e) + { + if (!IsDisposed) + { + try + { + _logger.LogWarning(NatsLogEvents.Connection, e, "Ping timer error"); + } + catch + { + // ignore logging exceptions in case our host might be disposed or shutting down + } + } + } + + try + { + _logger.LogDebug(NatsLogEvents.Connection, "Ping timer stopped"); + } catch { + // ignore logging exceptions in case our host might be disposed or shutting down } } @@ -834,6 +912,15 @@ private void EnqueuePing(PingCommand pingCommand) // catch and log all exceptions, enforcing the socketComponentDisposeTimeout private async ValueTask DisposeSocketComponentAsync(IAsyncDisposable component, string description) { + try + { + _logger.LogDebug(NatsLogEvents.Connection, "Dispose socket component {Description}", description); + } + catch + { + // ignore logging exceptions in case our host might be disposed or shutting down + } + try { var dispose = component.DisposeAsync(); @@ -842,7 +929,14 @@ private async ValueTask DisposeSocketComponentAsync(IAsyncDisposable component, } catch (Exception ex) { - _logger.LogError(NatsLogEvents.Connection, ex, $"Error occured when disposing {description}"); + try + { + _logger.LogError(NatsLogEvents.Connection, ex, $"Error occured when disposing {description}"); + } + catch + { + // ignore logging exceptions in case our host might be disposed or shutting down + } } } @@ -851,6 +945,15 @@ private async ValueTask DisposeSocketComponentAsync(IAsyncDisposable component, // Dispose Reader(Drain read buffers but no reads more) private async ValueTask DisposeSocketAsync(bool asyncReaderDispose) { + try + { + _logger.LogDebug(NatsLogEvents.Connection, "Disposing socket"); + } + catch + { + // ignore logging exceptions in case our host might be disposed or shutting down + } + if (_socket != null) { await DisposeSocketComponentAsync(_socket, "socket").ConfigureAwait(false);