Skip to content

Commit

Permalink
Connect Enhancements (#833)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 30, 2023
1 parent 36742e8 commit e199ed6
Show file tree
Hide file tree
Showing 20 changed files with 470 additions and 169 deletions.
64 changes: 43 additions & 21 deletions src/NATS.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,15 @@ private void processConnectInit(Srv s)
try
{
conn.ReceiveTimeout = opts.Timeout;
processExpectedInfo(s);
if (!opts.TlsFirst)
{
processExpectedInfo();
}
checkForSecure(s);
if (opts.TlsFirst)
{
processExpectedInfo();
}
sendConnect();
}
catch (IOException ex)
Expand Down Expand Up @@ -1188,8 +1196,10 @@ internal bool connect(Srv s, out Exception exToThrow)
}
catch (NATSConnectionException ex)
{
if (!ex.IsAuthorizationViolationError() && !ex.IsAuthenticationExpiredError())
if (!ex.IsAuthenticationOrAuthorizationError())
{
throw;
}

ScheduleErrorEvent(s, ex);

Expand Down Expand Up @@ -1222,7 +1232,7 @@ internal void ScheduleErrorEvent(object sender, NATSException ex, Subscription s
opts.AsyncErrorEventHandlerOrDefault(sender, new ErrEventArgs(this, subscription, ex.Message)));
}

internal void connect()
internal void connect(bool reconnectOnConnect)
{
Exception exToThrow = null;

Expand All @@ -1234,11 +1244,22 @@ internal void connect()
{
if (status != ConnState.CONNECTED)
{
if (exToThrow is NATSException)
throw exToThrow;
if (exToThrow != null)
throw new NATSConnectionException("Failed to connect", exToThrow);
throw new NATSNoServersException("Unable to connect to a server.");
if (reconnectOnConnect)
{
doReconnect();
}
else
{
if (exToThrow is NATSException)
{
throw exToThrow;
}
if (exToThrow != null)
{
throw new NATSConnectionException("Failed to connect", exToThrow);
}
throw new NATSNoServersException("Unable to connect to a server.");
}
}
}
}
Expand All @@ -1248,17 +1269,20 @@ internal void connect()
// only be called after the INIT protocol has been received.
private void checkForSecure(Srv s)
{
// Check to see if we need to engage TLS
// Check for mismatch in setups
if (Opts.Secure && !info.TlsRequired)
{
throw new NATSSecureConnWantedException();
}
else if (info.TlsRequired && !Opts.Secure)
if (!Opts.TlsFirst)
{
// If the server asks us to be secure, give it
// a shot.
Opts.Secure = true;
// Check to see if we need to engage TLS
// Check for mismatch in setups
if (Opts.Secure && !info.TlsRequired)
{
throw new NATSSecureConnWantedException();
}
else if (info.TlsRequired && !Opts.Secure)
{
// If the server asks us to be secure, give it
// a shot.
Opts.Secure = true;
}
}

// Need to rewrap with bufio if options tell us we need
Expand All @@ -1272,10 +1296,9 @@ private void checkForSecure(Srv s)
// processExpectedInfo will look for the expected first INFO message
// sent when a connection is established. The lock should be held entering.
// Caller must lock.
private void processExpectedInfo(Srv s)
private void processExpectedInfo()
{
Control c;

try
{
conn.SendTimeout = 2;
Expand All @@ -1298,7 +1321,6 @@ private void processExpectedInfo(Srv s)

// do not notify listeners of server changes when we process the first INFO message
processInfo(c.args, false);
checkForSecure(s);
}

internal void SendUnsub(long sid, int max)
Expand Down
45 changes: 27 additions & 18 deletions src/NATS.Client/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ public ConnectionFactory() { }
/// </remarks>
/// <param name="url">A string containing the URL (or URLs) to the NATS Server. See the Remarks
/// section for more information.</param>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <returns>An <see cref="IConnection"/> object connected to the NATS server.</returns>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IConnection CreateConnection(string url)
public IConnection CreateConnection(string url, bool reconnectOnConnect = false)
{
return CreateConnection(GetDefaultOptions(url));
return CreateConnection(GetDefaultOptions(url), reconnectOnConnect);
}

/// <summary>
Expand All @@ -57,20 +58,21 @@ public IConnection CreateConnection(string url)
/// <param name="url">A string containing the URL (or URLs) to the NATS Server. See the Remarks
/// section for more information.</param>
/// <param name="credentialsPath">The full path to a chained credentials file.</param>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <returns>An <see cref="IConnection"/> object connected to the NATS server.</returns>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IConnection CreateConnection(string url, string credentialsPath)
public IConnection CreateConnection(string url, string credentialsPath, bool reconnectOnConnect = false)
{
if (string.IsNullOrWhiteSpace(credentialsPath))
throw new ArgumentException("Invalid credentials path", nameof(credentialsPath));

Options opts = GetDefaultOptions(url);
opts.SetUserCredentials(credentialsPath);
return CreateConnection(opts);
return CreateConnection(opts, reconnectOnConnect);
}

/// <summary>
Expand All @@ -84,13 +86,14 @@ public IConnection CreateConnection(string url, string credentialsPath)
/// section for more information.</param>
/// <param name="jwt">The path to a user's public JWT credentials.</param>
/// <param name="privateNkey">The path to a file for user user's private Nkey seed.</param>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <returns>An <see cref="IConnection"/> object connected to the NATS server.</returns>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IConnection CreateConnection(string url, string jwt, string privateNkey)
public IConnection CreateConnection(string url, string jwt, string privateNkey, bool reconnectOnConnect = false)
{
if (string.IsNullOrWhiteSpace(jwt))
throw new ArgumentException("Invalid jwt path", nameof(jwt));
Expand All @@ -99,7 +102,7 @@ public IConnection CreateConnection(string url, string jwt, string privateNkey)

Options opts = GetDefaultOptions(url);
opts.SetUserCredentials(jwt, privateNkey);
return CreateConnection(opts);
return CreateConnection(opts, reconnectOnConnect);
}

/// <summary>
Expand Down Expand Up @@ -127,50 +130,53 @@ public static Options GetDefaultOptions(string server = null)
/// </remarks>
/// <param name="url">A string containing the URL (or URLs) to the NATS Server. See the Remarks
/// section for more information.</param>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <returns>An <see cref="IConnection"/> object connected to the NATS server.</returns>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IConnection CreateSecureConnection(string url)
public IConnection CreateSecureConnection(string url, bool reconnectOnConnect = false)
{
Options opts = GetDefaultOptions(url);
opts.Secure = true;
return CreateConnection(opts);
return CreateConnection(opts, reconnectOnConnect);
}

/// <summary>
/// Create a connection to the NATs server using the default options.
/// </summary>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <returns>An <see cref="IConnection"/> object connected to the NATS server.</returns>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
/// <seealso cref="GetDefaultOptions"/>
public IConnection CreateConnection()
public IConnection CreateConnection(bool reconnectOnConnect = false)
{
return CreateConnection(GetDefaultOptions());
return CreateConnection(GetDefaultOptions(), reconnectOnConnect);
}

/// <summary>
/// Create a connection to a NATS Server defined by the given options.
/// </summary>
/// <param name="opts">The NATS client options to use for this connection.</param>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <returns>An <see cref="IConnection"/> object connected to the NATS server.</returns>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IConnection CreateConnection(Options opts)
public IConnection CreateConnection(Options opts, bool reconnectOnConnect = false)
{
Connection nc = new Connection(opts);
try
{
nc.connect();
nc.connect(reconnectOnConnect);
}
catch (Exception)
{
Expand All @@ -183,16 +189,17 @@ public IConnection CreateConnection(Options opts)
/// <summary>
/// Attempt to connect to the NATS server, with an encoded connection, using the default options.
/// </summary>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <returns>An <see cref="IEncodedConnection"/> object connected to the NATS server.</returns>
/// <seealso cref="GetDefaultOptions"/>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IEncodedConnection CreateEncodedConnection()
public IEncodedConnection CreateEncodedConnection(bool reconnectOnConnect = false)
{
return CreateEncodedConnection(GetDefaultOptions());
return CreateEncodedConnection(GetDefaultOptions(), reconnectOnConnect);
}

/// <summary>
Expand All @@ -201,6 +208,7 @@ public IEncodedConnection CreateEncodedConnection()
/// <remarks>
/// <para><paramref name="url"/> can contain username/password semantics.
/// Comma seperated arrays are also supported, e.g. urlA, urlB.</para>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// </remarks>
/// <param name="url">A string containing the URL (or URLs) to the NATS Server. See the Remarks
/// section for more information.</param>
Expand All @@ -210,27 +218,28 @@ public IEncodedConnection CreateEncodedConnection()
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IEncodedConnection CreateEncodedConnection(string url)
public IEncodedConnection CreateEncodedConnection(string url, bool reconnectOnConnect = false)
{
return CreateEncodedConnection(GetDefaultOptions(url));
return CreateEncodedConnection(GetDefaultOptions(url), reconnectOnConnect);
}

/// <summary>
/// Attempt to connect to the NATS server, with an encoded connection, using the given options.
/// </summary>
/// <param name="opts">The NATS client options to use for this connection.</param>
/// <returns>An <see cref="IEncodedConnection"/> object connected to the NATS server.</returns>
/// <param name="reconnectOnConnect">if true, the connection will treat the initial connection as any other and attempt reconnects on failure</param>
/// <exception cref="NATSNoServersException">No connection to a NATS Server could be established.</exception>
/// <exception cref="NATSConnectionException"><para>A timeout occurred connecting to a NATS Server.</para>
/// <para>-or-</para>
/// <para>An exception was encountered while connecting to a NATS Server. See <see cref="Exception.InnerException"/> for more
/// details.</para></exception>
public IEncodedConnection CreateEncodedConnection(Options opts)
public IEncodedConnection CreateEncodedConnection(Options opts, bool reconnectOnConnect = false)
{
EncodedConnection nc = new EncodedConnection(opts);
try
{
nc.connect();
nc.connect(reconnectOnConnect);
}
catch (Exception)
{
Expand Down
15 changes: 7 additions & 8 deletions src/NATS.Client/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ public class NATSConnectionException : NATSException
{
public NATSConnectionException(string err) : base(err) { }
public NATSConnectionException(string err, Exception innerEx) : base(err, innerEx) { }
}

internal static class NATSConnectionExceptionExtensions
{
internal static bool IsAuthorizationViolationError(this NATSConnectionException ex)
=> ex?.Message.Equals("'authorization violation'", StringComparison.OrdinalIgnoreCase) == true;

internal static bool IsAuthenticationExpiredError(this NATSConnectionException ex)
=> ex?.Message.Equals("'authentication expired'", StringComparison.OrdinalIgnoreCase) == true;
public bool IsAuthenticationOrAuthorizationError()
{
string lowerMessage = Message.ToLower();
return lowerMessage.Contains("user authentication")
|| lowerMessage.Contains("authorization violation")
|| lowerMessage.Contains("authentication expired");
}
}

/// <summary>
Expand Down
Loading

0 comments on commit e199ed6

Please sign in to comment.