Skip to content

Commit

Permalink
Extensive logging for reconnect debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Aug 5, 2024
1 parent da96f73 commit 0c7ec12
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 16 deletions.
66 changes: 65 additions & 1 deletion src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -29,6 +30,8 @@ internal sealed class CommandWriter : IAsyncDisposable
private const int MinSegmentSize = 16384;

private readonly ILogger<CommandWriter> _logger;
private readonly bool _trace;
private readonly string _name;
private readonly NatsConnection _connection;
private readonly ObjectPool _pool;
private readonly int _arrayPoolInitialSize;
Expand All @@ -52,9 +55,11 @@ internal sealed class CommandWriter : IAsyncDisposable
private CancellationTokenSource? _ctsReader;
private volatile bool _disposed;

public CommandWriter(NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
{
_logger = opts.LoggerFactory.CreateLogger<CommandWriter>();
_trace = _logger.IsEnabled(LogLevel.Trace);
_name = name;
_connection = connection;
_pool = pool;

Expand All @@ -78,10 +83,14 @@ public CommandWriter(NatsConnection connection, ObjectPool pool, NatsOpts opts,
useSynchronizationContext: false));
_pipeReader = pipe.Reader;
_pipeWriter = pipe.Writer;

_logger.LogDebug(NatsLogEvents.Buffer, "Created {Name}", _name);
}

public void Reset(ISocketConnection socketConnection)
{
_logger.LogDebug(NatsLogEvents.Buffer, "Resetting {Name}", _name);

lock (_lock)
{
_socketConnection = socketConnection;
Expand All @@ -104,6 +113,8 @@ await ReaderLoopAsync(

public async Task CancelReaderLoopAsync()
{
_logger.LogDebug(NatsLogEvents.Buffer, "Canceling reader loop");

CancellationTokenSource? cts;
Task? readerTask;
lock (_lock)
Expand All @@ -130,10 +141,14 @@ public async Task CancelReaderLoopAsync()
// closed by the time we get here or soon after.
await readerTask.WaitAsync(_cts.Token).ConfigureAwait(false);
}

_logger.LogDebug(NatsLogEvents.Buffer, "Cancelled reader loop successfully");
}

public async ValueTask DisposeAsync()
{
_logger.LogDebug(NatsLogEvents.Buffer, "Disposing {Name}", _name);

if (_disposed)
{
return;
Expand Down Expand Up @@ -163,6 +178,11 @@ public async ValueTask DisposeAsync()

public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "CONNECT");
}

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
Expand Down Expand Up @@ -197,6 +217,11 @@ public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancella

public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellationToken)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "PING");
}

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
Expand Down Expand Up @@ -232,6 +257,11 @@ public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellati

public ValueTask PongAsync(CancellationToken cancellationToken = default)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "PONG");
}

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
Expand Down Expand Up @@ -266,6 +296,11 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)

public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize<T> serializer, CancellationToken cancellationToken)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "PUB {Subject} {ReplyTo}", subject, replyTo);
}

NatsPooledBufferWriter<byte>? headersBuffer = null;
if (headers != null)
{
Expand Down Expand Up @@ -348,6 +383,11 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,

public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "SUB {Subject} {QueueGroup} {MaxMsgs}", subject, queueGroup, maxMsgs);
}

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
Expand Down Expand Up @@ -382,6 +422,11 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int

public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cancellationToken)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "UNSUB {Sid} {MaxMsgs}", sid, maxMsgs);
}

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
Expand Down Expand Up @@ -445,6 +490,10 @@ private static async Task ReaderLoopAsync(
{
try
{
var trace = logger.IsEnabled(LogLevel.Trace);
logger.LogDebug(NatsLogEvents.Buffer, "Starting send buffer reader loop");

var stopwatch = Stopwatch.StartNew();
var examinedOffset = 0;
var pending = 0;
while (true)
Expand Down Expand Up @@ -481,7 +530,18 @@ private static async Task ReaderLoopAsync(
Exception? sendEx = null;
try
{
if (trace)
{
stopwatch.Restart();
}

sent = await connection.SendAsync(sendMem).ConfigureAwait(false);

if (trace)
{
stopwatch.Stop();
logger.LogTrace(NatsLogEvents.Buffer, "Socket.SendAsync Size: {Sent}/{Size} Elapsed: {ElapsedMs}ms", sent, sendMem.Length, stopwatch.Elapsed.TotalMilliseconds);
}
}
catch (Exception ex)
{
Expand Down Expand Up @@ -589,10 +649,12 @@ private static async Task ReaderLoopAsync(
catch (OperationCanceledException)
{
// Expected during shutdown
logger.LogDebug(NatsLogEvents.Buffer, "Operation canceled in send buffer reader loop (expected during shutdown)");
}
catch (ObjectDisposedException)
{
// Expected during shutdown
logger.LogDebug(NatsLogEvents.Buffer, "Object disposed in send buffer reader loop (expected during shutdown)");
}
catch (SocketException e)
{
Expand All @@ -613,6 +675,8 @@ private static async Task ReaderLoopAsync(
{
logger.LogError(NatsLogEvents.Buffer, e, "Unexpected error in send buffer reader loop");
}

logger.LogDebug(NatsLogEvents.Buffer, "Exiting send buffer reader loop");
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Commands/PriorityCommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable

public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing)
{
CommandWriter = new CommandWriter(connection, pool, opts, counter, enqueuePing, overrideCommandTimeout: Timeout.InfiniteTimeSpan);
CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing, overrideCommandTimeout: Timeout.InfiniteTimeSpan);
CommandWriter.Reset(socketConnection);
}

Expand Down
33 changes: 33 additions & 0 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ private async Task ReadLoopAsync()
{
// https://docs.nats.io/reference/reference-protocols/nats-protocol#msg
// MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "MSG");
}

// Try to find before \n
var positionBeforePayload = buffer.PositionOf((byte)'\n');
Expand Down Expand Up @@ -220,6 +224,10 @@ private async Task ReadLoopAsync()
{
// https://docs.nats.io/reference/reference-protocols/nats-protocol#hmsg
// HMSG <subject> <sid> [reply-to] <#header bytes> <#total bytes>\r\n[headers]\r\n\r\n[payload]\r\n
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "HMSG");
}

// Find the end of 'HMSG' first message line
var positionBeforeNatsHeader = buffer.PositionOf((byte)'\n');
Expand Down Expand Up @@ -309,6 +317,11 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R

if (code == ServerOpCodes.Ping)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "PING");
}

const int PingSize = 6; // PING\r\n

await _connection.PongAsync().ConfigureAwait(false); // return pong to server
Expand All @@ -324,6 +337,11 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
}
else if (code == ServerOpCodes.Pong)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "PONG");
}

const int PongSize = 6; // PONG\r\n

_connection.ResetPongCount(); // reset count for PingTimer
Expand All @@ -345,6 +363,11 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
}
else if (code == ServerOpCodes.Error)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "ERR");
}

// try to get \n.
var position = buffer.PositionOf((byte)'\n');
if (position == null)
Expand All @@ -367,6 +390,11 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
}
else if (code == ServerOpCodes.Ok)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "OK");
}

const int OkSize = 5; // +OK\r\n

if (length < OkSize)
Expand All @@ -380,6 +408,11 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
}
else if (code == ServerOpCodes.Info)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "INFO");
}

// try to get \n.
var position = buffer.PositionOf((byte)'\n');

Expand Down
26 changes: 26 additions & 0 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal sealed record SubscriptionMetadata(int Sid);
internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable
{
private readonly ILogger<SubscriptionManager> _logger;
private readonly bool _trace;
private readonly bool _debug;
private readonly object _gate = new();
private readonly NatsConnection _connection;
Expand All @@ -39,6 +40,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
_inboxPrefix = inboxPrefix;
_logger = _connection.Opts.LoggerFactory.CreateLogger<SubscriptionManager>();
_debug = _logger.IsEnabled(LogLevel.Debug);
_trace = _logger.IsEnabled(LogLevel.Trace);
_cts = new CancellationTokenSource();
_cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval;
_timer = Task.Run(CleanupAsync);
Expand Down Expand Up @@ -90,13 +92,23 @@ public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationT

public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Subscription, "Received subscription data for {Subject}/{Sid}", subject, sid);
}

int? orphanSid = null;
lock (_gate)
{
if (_bySid.TryGetValue(sid, out var sidMetadata))
{
if (sidMetadata.WeakReference.TryGetTarget(out var sub))
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Subscription, "Found subscription handler for {Subject}/{Sid}", subject, sid);
}

return sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer);
}
else
Expand Down Expand Up @@ -182,6 +194,11 @@ public ValueTask RemoveAsync(NatsSubBase sub)
/// <returns>Enumerable list of commands</returns>
public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
{
if (_debug)
{
_logger.LogDebug(NatsLogEvents.Subscription, "Reconnect commands requested");
}

var subs = new List<(NatsSubBase, int)>();
lock (_gate)
{
Expand All @@ -191,12 +208,21 @@ public async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter)
{
subs.Add((sub, sid));
}
else
{
_logger.LogError(NatsLogEvents.Subscription, "While reconnecting found subscription GCd but was never disposed {SidMetadataSubject}/{Sid}", sidMetadata.Subject, sid);
}
}
}

foreach (var (sub, sid) in subs)
{
await sub.WriteReconnectCommandsAsync(commandWriter, sid).ConfigureAwait(false);

if (_debug)
{
_logger.LogDebug(NatsLogEvents.Subscription, "Wrote reconnect commands for subscription {Subject}", sub.Subject);
}
}
}

Expand Down
Loading

0 comments on commit 0c7ec12

Please sign in to comment.