Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually redo of previous reconnect-buffer PR #314

Merged
merged 4 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,27 @@ internal void publish(string subject, string reply, byte[] data, int offset, int
// write our pubProtoBuf buffer to the buffered writer.
int pubProtoLen = writePublishProto(pubProtoBuf, subject, reply, count);

// Check if we are reconnecting, and if so check if
// we have exceeded our reconnect outbound buffer limits.
// Don't use IsReconnecting to avoid locking.
if (status == ConnState.RECONNECTING)
{
int rbsize = opts.ReconnectBufferSize;
if (rbsize != 0)
{
if (rbsize == -1)
throw new NATSReconnectBufferException("Reconnect buffering has been disabled.");

if (flushBuffer)
bw.Flush();
else
kickFlusher();

if (pending.Position + count + pubProtoLen > rbsize)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ColinSullivan1 Replying to your Q here instead to get the context.

First publish: 0+20+12 > 32

Second publish: 0+21+12 > 32

throw new NATSReconnectBufferException("Reconnect buffer exceeded.");
}
}

bw.Write(pubProtoBuf, 0, pubProtoLen);

if (count > 0)
Expand Down Expand Up @@ -3754,7 +3775,30 @@ private void close(ConnState closeState, bool invokeDelegates)
{
bw.Dispose();
}
catch (Exception) { /* ignore */ }
catch (Exception)
{
/* ignore */
}
finally
{
bw = null;
}
}

if (br != null)
{
try
{
br.Dispose();
}
catch
{
// ignored
}
finally
{
br = null;
}
}

conn.teardown();
Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ internal NATSConnectionException(string err) : base(err) { }
internal NATSConnectionException(string err, Exception innerEx) : base(err, innerEx) { }
}

/// <summary>
/// The exception that is thrown when there is an error writing
/// to the internal reconnect buffer.
/// </summary>
public class NATSReconnectBufferException : NATSConnectionException
{
internal NATSReconnectBufferException(string err) : base(err) { }
}

/// <summary>
/// This exception that is thrown when there is an internal error with
/// the NATS protocol.
Expand Down
4 changes: 4 additions & 0 deletions src/NATS.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public interface IConnection : IDisposable
/// the current connection.</param>
/// <param name="data">An array of type <see cref="Byte"/> that contains the data to publish
/// to the connected NATS server.</param>
/// <exception cref="NATSReconnectBufferException"> is thrown when
/// publishing while reconnecting and the internal reconnect buffer
/// has been disabled or exceeded.</exception>
/// <seealso cref="Options.ReconnectBufferSize"></seealso>
void Publish(string subject, byte[] data);

/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client/NATS.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public static class Defaults
/// </summary>
public const int DefaultDrainTimeout = 30000;

/// <summary>
/// Default Pending buffer size is 8 MB.
/// </summary>
public const int ReconnectBufferSize = 8 * 1024 * 1024; // 8MB

/*
* Namespace level defaults
*/
Expand Down
42 changes: 41 additions & 1 deletion src/NATS.Client/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public void SetJWTEventHandlers(EventHandler<UserJWTEventArgs> JWTEventHandler,

// Must be greater than 0.
internal int subscriptionBatchSize = 64;
internal int reconnectBufSize = Defaults.ReconnectBufferSize;

internal string user;
internal string password;
Expand All @@ -208,6 +209,7 @@ internal Options(Options o)
noRandomize = o.noRandomize;
noEcho = o.noEcho;
pedantic = o.pedantic;
reconnectBufSize = o.reconnectBufSize;
useOldRequestStyle = o.useOldRequestStyle;
pingInterval = o.pingInterval;
ReconnectedEventHandler = o.ReconnectedEventHandler;
Expand Down Expand Up @@ -269,7 +271,7 @@ public string Url
}

/// <summary>
/// Gets or sets the array of servers that the NATs client will connect to.
/// Gets or sets the array of servers that the NATS client will connect to.
/// </summary>
/// <remarks>
/// The individual URLs may contain username/password information.
Expand Down Expand Up @@ -585,6 +587,43 @@ private void appendEventHandler(StringBuilder sb, String name, Delegate eh)
sb.AppendFormat("{0}=null;", name);
}


/// <summary>
/// Constant used to sets the reconnect buffer size to unbounded.
/// </summary>
/// <seealso cref="ReconnectBufferSize"/>
public static readonly int ReconnectBufferSizeUnbounded = 0;

/// <summary>
/// Constant that disables the reconnect buffer.
/// </summary>
/// <seealso cref="ReconnectBufferSize"/>
public static readonly int ReconnectBufferDisabled = -1;

/// <summary>
/// Gets or sets the buffer size of messages kept while busy reconnecting.
/// </summary>
/// <remarks>
/// When reconnecting, the NATS client will hold published messages that
/// will be flushed to the new server upon a successful reconnect. The default
/// is buffer size is 8 MB. This buffering can be disabled.
/// </remarks>
/// <seealso cref="ReconnectBufferSizeUnbounded"/>
/// <seealso cref="ReconnectBufferDisabled"/>
public int ReconnectBufferSize
{
get { return reconnectBufSize; }
set
{
if (value < -1)
{
throw new ArgumentOutOfRangeException("value", "Reconnect buffer size must be greater than or equal to -1");
}

reconnectBufSize = value;
}
}

/// <summary>
/// Returns a string representation of the
/// value of this Options instance.
Expand All @@ -609,6 +648,7 @@ public override string ToString()
sb.AppendFormat("Pendantic={0};", Pedantic);
sb.AppendFormat("UseOldRequestStyle={0}", UseOldRequestStyle);
sb.AppendFormat("PingInterval={0};", PingInterval);
sb.AppendFormat("ReconnectBufferSize={0};", ReconnectBufferSize);
sb.AppendFormat("ReconnectWait={0};", ReconnectWait);
sb.AppendFormat("Secure={0};", Secure);
sb.AppendFormat("User={0};", User);
Expand Down
2 changes: 0 additions & 2 deletions src/Tests/IntegrationTests/TestConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,6 @@ public void TestInfineReconnect()
}

/// NOT IMPLEMENTED:
/// TestServerSecureConnections
/// TestErrOnConnectAndDeadlock
/// TestErrOnMaxPayloadLimit
}

Expand Down
87 changes: 87 additions & 0 deletions src/Tests/IntegrationTests/TestReconnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,93 @@ public void TestReconnectVerbose()
c.Flush();
}
}

[Fact]
public void TestReconnectBufferProperty()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.ReconnectBufferSize = Options.ReconnectBufferDisabled;
opts.ReconnectBufferSize = Options.ReconnectBufferSizeUnbounded;
opts.ReconnectBufferSize = 1024 * 1024;
Assert.Throws<ArgumentOutOfRangeException>(() => { opts.ReconnectBufferSize = -2; });
}

[Fact]
public void TestReconnectBufferDisabled()
{
IConnection c;
ISyncSubscription s;

AutoResetEvent disconnected = new AutoResetEvent(false);
AutoResetEvent reconnected = new AutoResetEvent(false);

var opts = Context.GetTestOptions(Context.Server1.Port);
opts.ReconnectBufferSize = Options.ReconnectBufferDisabled;
opts.DisconnectedEventHandler = (obj, args) => { disconnected.Set(); };
opts.ReconnectedEventHandler = (obj, args) => { reconnected.Set(); };

using (var server = NATSServer.Create(Context.Server1.Port))
{
// Create our client connections.
c = new ConnectionFactory().CreateConnection(opts);
s = c.SubscribeSync("foo");
// let the server shutdown via dispose
}

// wait until we're disconnected.
Assert.True(disconnected.WaitOne(5000));


// Publish a message.
Assert.Throws<NATSReconnectBufferException>( () => { c.Publish("foo", null); });

using (var server = NATSServer.Create(Context.Server1.Port))
{
// wait for the client to reconnect.
Assert.True(reconnected.WaitOne(20000));

// Check that we do not receive a message.
Assert.Throws<NATSTimeoutException>(() => { s.NextMessage(1000); });
}

c.Close();
c.Dispose();
}

[Fact]
public void TestReconnectBufferBoundary()
{
IConnection c;
ISubscription s;

AutoResetEvent disconnected = new AutoResetEvent(false);

var opts = Context.GetTestOptions(Context.Server1.Port);
opts.ReconnectBufferSize = 32; // 32 bytes
opts.DisconnectedEventHandler = (obj, args) => { disconnected.Set(); };
EventHandler<MsgHandlerEventArgs> eh = (obj, args) => { /* NOOP */ };

using (var server = NATSServer.Create(Context.Server1.Port))
{
c = new ConnectionFactory().CreateConnection(opts);
s = c.SubscribeAsync("foo", eh);

// let the server shutdown via dispose
}

// wait until we're disconnected.
Assert.True(disconnected.WaitOne(5000));

// PUB foo 25\r\n<...> = 30 so first publish should be OK, 2nd publish
// should fail.
byte[] okPayload = new byte[20];
byte[] exceedingPayload = new byte[21];
c.Publish("foo", okPayload);
Assert.Throws<NATSReconnectBufferException>(() => c.Publish("foo", exceedingPayload));

c.Close();
c.Dispose();
}
}

public class TestPublishErrorsDuringReconnect : TestSuite<PublishErrorsDuringReconnectSuiteContext>
Expand Down