Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR 286, manually updated #346

Merged
merged 6 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/NATS.Client/EncodedConn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ internal EncodedConnection(Options opts)
private IFormatter f = new BinaryFormatter();

// Note, the connection locks around this, so while we are using a
// byte array in the connection, we are still threadsafe.
// byte array in the connection, we are still thread safe.
internal byte[] defaultSerializer(Object obj)
{
byte[] rv = null;
Expand Down Expand Up @@ -143,7 +143,7 @@ internal byte[] defaultSerializer(Object obj)
// it'd be nice to have a default serializer per subscription to avoid
// the lock here, but this mirrors go.
//
// TODO: Look at moving the default to the wrapper and keeping a derialization
// TODO: Look at moving the default to the wrapper and keeping a deserialization
// stream there.
internal object defaultDeserializer(byte[] data)
{
Expand Down Expand Up @@ -182,7 +182,7 @@ private void publishObject(string subject, string reply, object o)
/// <param name="obj">The <see cref="Object"/> to serialize and publish to the connected NATS server.</param>
/// <exception cref="NATSBadSubscriptionException"><paramref name="subject"/> is
/// <c>null</c> or entirely whitespace.</exception>
/// <exception cref="NATSMaxPayloadException">The serialzed form of <paramref name="obj"/> exceeds the maximum payload size
/// <exception cref="NATSMaxPayloadException">The serialized form of <paramref name="obj"/> exceeds the maximum payload size
/// supported by the NATS server.</exception>
/// <exception cref="NATSConnectionClosedException">The <see cref="Connection"/> is closed.</exception>
/// <exception cref="NATSException"><para><see cref="OnSerialize"/> is <c>null</c>.</para>
Expand All @@ -204,7 +204,7 @@ public void Publish(string subject, Object obj)
/// <param name="obj">The <see cref="Object"/> to serialize and publish to the connected NATS server.</param>
/// <exception cref="NATSBadSubscriptionException"><paramref name="subject"/> is <c>null</c> or
/// entirely whitespace.</exception>
/// <exception cref="NATSMaxPayloadException">The serialzed form of <paramref name="obj"/> exceeds the maximum payload size
/// <exception cref="NATSMaxPayloadException">The serialized form of <paramref name="obj"/> exceeds the maximum payload size
/// supported by the NATS server.</exception>
/// <exception cref="NATSConnectionClosedException">The <see cref="Connection"/> is closed.</exception>
/// <exception cref="NATSException"><para><see cref="OnSerialize"/> is <c>null</c>.</para>
Expand Down Expand Up @@ -359,7 +359,7 @@ private object requestObject(string subject, object obj, int timeout)
/// (<c>0</c>).</exception>
/// <exception cref="NATSBadSubscriptionException"><paramref name="subject"/> is <c>null</c> or
/// entirely whitespace.</exception>
/// <exception cref="NATSMaxPayloadException">The serialzed form of <paramref name="obj"/> exceeds the maximum payload size
/// <exception cref="NATSMaxPayloadException">The serialized form of <paramref name="obj"/> exceeds the maximum payload size
/// supported by the NATS server.</exception>
/// <exception cref="NATSConnectionClosedException">The <see cref="Connection"/> is closed.</exception>
/// <exception cref="NATSTimeoutException">A timeout occurred while sending the request or receiving the
Expand Down Expand Up @@ -387,7 +387,7 @@ public object Request(string subject, object obj, int timeout)
/// <returns>A <see cref="Object"/> with the deserialized response from the NATS server.</returns>
/// <exception cref="NATSBadSubscriptionException"><paramref name="subject"/> is <c>null</c> or
/// entirely whitespace.</exception>
/// <exception cref="NATSMaxPayloadException">The serialzed form of <paramref name="obj"/> exceeds the maximum payload size
/// <exception cref="NATSMaxPayloadException">The serialized form of <paramref name="obj"/> exceeds the maximum payload size
/// supported by the NATS server.</exception>
/// <exception cref="NATSConnectionClosedException">The <see cref="Connection"/> is closed.</exception>
/// <exception cref="NATSTimeoutException">A timeout occurred while sending the request or receiving the
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ internal NATSMaxMessagesException() : base("Maximum number of messages have been
public class NATSBadSubscriptionException : NATSException
{
internal NATSBadSubscriptionException() : base("Subscription is not valid.") { }
internal NATSBadSubscriptionException(string s) : base("s") { }
internal NATSBadSubscriptionException(string s) : base(s) { }
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public interface IConnection : IDisposable
/// <para>NATS implements a publish-subscribe message distribution model. NATS publish subscribe is a
/// one-to-many communication. A publisher sends a message on a subject. Any active subscriber listening
/// on that subject receives the message. Subscribers can register interest in wildcard subjects.</para>
/// <para>In the basic NATS platfrom, if a subscriber is not listening on the subject (no subject match),
/// or is not acive when the message is sent, the message is not recieved. NATS is a fire-and-forget
/// <para>In the basic NATS platform, if a subscriber is not listening on the subject (no subject match),
/// or is not active when the message is sent, the message is not received. NATS is a fire-and-forget
/// messaging system. If you need higher levels of service, you can either use NATS Streaming, or build the
/// additional reliability into your client(s) yourself.</para>
/// </remarks>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/ISubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface ISubscription
/// <item>Wildcards must be separate tokens (<c>foo.*.bar</c> or <c>foo.&gt;</c> are syntactically
/// valid; <c>foo*.bar</c>, <c>f*o.b*r</c> and <c>foo&gt;</c> are not).</item>
/// </list>
/// <para>For example, the wildcard subscrpitions <c>foo.*.quux</c> and <c>foo.&gt;</c> both match
/// <para>For example, the wildcard subscriptions <c>foo.*.quux</c> and <c>foo.&gt;</c> both match
/// <c>foo.bar.quux</c>, but only the latter matches <c>foo.bar.baz</c>. With the full wildcard,
/// it is also possible to express interest in every subject that may exist in NATS (<c>&gt;</c>).</para>
/// </remarks>
Expand Down
13 changes: 11 additions & 2 deletions src/NATS.Client/Msg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,28 @@ public void AssignData(byte[] data)
/// <summary>
/// Gets the <see cref="ISubscription"/> which received the message.
/// </summary>
[ObsoleteAttribute("This property will soon be deprecated. Use ArrivalSubscription instead.")]
public ISubscription ArrivalSubcription
{
get { return sub; }
}

/// <summary>
/// Gets the <see cref="ISubscription"/> which received the message.
/// </summary>
public ISubscription ArrivalSubscription
{
get { return sub; }
}

/// <summary>
/// Send a response to the message on the arrival subscription.
/// </summary>
/// <param name="data">The response payload to send.</param>
/// <exception cref="NATSException">
/// <para><see cref="Reply"/> is null or empty.</para>
/// <para>-or-</para>
/// <para><see cref="ArrivalSubcription"/> is null.</para>
/// <para><see cref="ArrivalSubscription"/> is null.</para>
/// </exception>
public void Respond(byte[] data)
{
Expand All @@ -183,7 +192,7 @@ public void Respond(byte[] data)
throw new NATSException("No Reply subject");
}

Connection conn = ArrivalSubcription?.Connection;
Connection conn = ArrivalSubscription?.Connection;
if (conn == null)
{
throw new NATSException("Message is not bound to a subscription");
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/NATS.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ internal get
{
if (signedNonce == null)
{
throw new NATSConnectionException("SignedNonce was not set by the UserSignature event hander.");
throw new NATSConnectionException("SignedNonce was not set by the UserSignature event handler.");
}
return signedNonce;
}
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void SetNkey(string publicNkey, EventHandler<UserSignatureEventArgs> user
/// sign the server nonce.
/// </summary>
/// <param name="publicNkey">The User's public Nkey</param>
/// <param name="privateKeyPath">A path to a file contianing the private Nkey.</param>
/// <param name="privateKeyPath">A path to a file containing the private Nkey.</param>
public void SetNkey(string publicNkey, string privateKeyPath)
{
if (string.IsNullOrWhiteSpace(publicNkey)) throw new ArgumentException("Invalid publicNkey", nameof(publicNkey));
Expand Down Expand Up @@ -509,12 +509,12 @@ public string CustomInboxPrefix
}

/// <summary>
/// Adds an X.509 certifcate from a file for use with a secure connection.
/// Adds an X.509 certificate from a file for use with a secure connection.
/// </summary>
/// <param name="fileName">Path to the certificate file to add.</param>
/// <exception cref="ArgumentNullException"><paramref name="fileName"/> is <c>null</c>.</exception>
/// <exception cref="System.Security.Cryptography.CryptographicException">An error with the certificate
/// ocurred. For example:
/// occurred. For example:
/// <list>
/// <item>The certificate file does not exist.</item>
/// <item>The certificate is invalid.</item>
Expand All @@ -528,12 +528,12 @@ public void AddCertificate(string fileName)
}

/// <summary>
/// Adds an X.509 certifcate for use with a secure connection.
/// Adds an X.509 certificate for use with a secure connection.
/// </summary>
/// <param name="certificate">An X.509 certificate represented as an <see cref="X509Certificate2"/> object.</param>
/// <exception cref="ArgumentNullException"><paramref name="certificate"/> is <c>null</c>.</exception>
/// <exception cref="System.Security.Cryptography.CryptographicException">An error with the certificate
/// ocurred. For example:
/// occurred. For example:
/// <list>
/// <item>The certificate file does not exist.</item>
/// <item>The certificate is invalid.</item>
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client/ServerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public int GetHashCode(Srv obj)

// Create the server pool using the options given.
// We will place a Url option first, followed by any
// Server Options. We will randomize the server pool unlesss
// Server Options. We will randomize the server pool unless
// the NoRandomize flag is set.
internal void Setup(Options opts)
{
Expand Down Expand Up @@ -220,7 +220,7 @@ private bool add(Srv s)
}
}

// removes implict servers NOT found in the provided list.
// removes implicit servers NOT found in the provided list.
internal void PruneOutdatedServers(string[] newUrls)
{
LinkedList<string> ulist = new LinkedList<string>(newUrls);
Expand All @@ -235,7 +235,7 @@ internal void PruneOutdatedServers(string[] newUrls)
foreach (Srv s in tmp)
{
// The server returns "<host>:<port>". We can't compare
// againts Uri.Authority becase that API may strip out
// against Uri.Authority because that API may strip out
// ports.
string hp = string.Format("{0}:{1}", s.url.Host, s.url.Port);
if (s.isImplicit && !ulist.Contains(hp) &&
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/Statistics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public long Reconnects
get { return reconnects; }
}

// deep copy contructor
// deep copy constructor
internal Statistics(Statistics obj)
{
this.inMsgs = obj.inMsgs;
Expand Down
2 changes: 1 addition & 1 deletion src/Tests/IntegrationTests/TestBasic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void TestAsyncSubscribe()
private void CheckReceivedAndValidHandler(object sender, MsgHandlerEventArgs args)
{
Assert.True(compare(args.Message.Data, omsg), "Messages are not equal.");
Assert.Equal(asyncSub, args.Message.ArrivalSubcription);
Assert.Equal(asyncSub, args.Message.ArrivalSubscription);

lock (mu)
{
Expand Down
62 changes: 31 additions & 31 deletions src/Tests/IntegrationTests/TestConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
using Xunit;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Timers;
using System.Timers;

namespace IntegrationTests
{
public class TestConnection : TestSuite<ConnectionSuiteContext>
Expand Down Expand Up @@ -793,7 +793,7 @@ public void TestDrain()
c.Drain();

done.WaitOne(5000);
Assert.True(received == expected, string.Format("recieved {0} of {1}", received, expected));
Assert.True(received == expected, string.Format("received {0} of {1}", received, expected));
}
}

Expand Down Expand Up @@ -833,7 +833,7 @@ public void TestDrainAsync()
t.Wait();

done.WaitOne(5000);
Assert.True(received == expected, string.Format("recieved {0} of {1}", received, expected));
Assert.True(received == expected, string.Format("received {0} of {1}", received, expected));
}
}

Expand Down Expand Up @@ -867,7 +867,7 @@ public void TestDrainSub()
s.Drain();

done.WaitOne(5000);
Assert.True(received == expected, string.Format("recieved {0} of {1}", received, expected));
Assert.True(received == expected, string.Format("received {0} of {1}", received, expected));
}
}

Expand Down Expand Up @@ -907,7 +907,7 @@ public void TestDrainSubAsync()
t.Wait();

done.WaitOne(5000);
Assert.True(received == expected, string.Format("recieved {0} of {1}", received, expected));
Assert.True(received == expected, string.Format("received {0} of {1}", received, expected));
}
}

Expand Down Expand Up @@ -1034,7 +1034,7 @@ public async Task TestDrainStateBehavior()
var opts = Context.GetTestOptionsWithDefaultTimeout(Context.Server1.Port);
opts.ClosedEventHandler = (obj, args) =>
{
closed.Set();
closed.Set();
};
var c = Context.ConnectionFactory.CreateConnection(opts);
var s = c.SubscribeAsync("foo", (obj, args) =>
Expand Down Expand Up @@ -1067,35 +1067,35 @@ public async Task TestDrainStateBehavior()
}

[Fact]
public void TestFlushBuffer()
{
IConnection c = null;
AutoResetEvent disconnected = new AutoResetEvent(false);
AutoResetEvent closed = new AutoResetEvent(false);
public void TestFlushBuffer()
{
IConnection c = null;
AutoResetEvent disconnected = new AutoResetEvent(false);
AutoResetEvent closed = new AutoResetEvent(false);

using (NATSServer.CreateFastAndVerify(Context.Server1.Port))
{
{


var opts = Context.GetTestOptionsWithDefaultTimeout(Context.Server1.Port);
opts.ClosedEventHandler = (obj, args) =>
{
closed.Set();
closed.Set();
};
opts.DisconnectedEventHandler = (obj, args) =>
{
disconnected.Set();
disconnected.Set();
};

c = Context.ConnectionFactory.CreateConnection(opts);
// test empty buffer
c.FlushBuffer();
// test multiple calls
c.FlushBuffer();
c.Publish("foo", new byte[10240]);
c.FlushBuffer();
c = Context.ConnectionFactory.CreateConnection(opts);

// test empty buffer
c.FlushBuffer();
// test multiple calls
c.FlushBuffer();

c.Publish("foo", new byte[10240]);
c.FlushBuffer();
}

// wait until we're disconnected
Expand All @@ -1108,12 +1108,12 @@ public void TestFlushBuffer()
c.FlushBuffer();

// close and then check the closed connection.
c.Close();
Assert.True(closed.WaitOne(10000));
Assert.Throws<NATSConnectionClosedException>(() => c.FlushBuffer());
c.Dispose();
}
c.Close();
Assert.True(closed.WaitOne(10000));
Assert.Throws<NATSConnectionClosedException>(() => c.FlushBuffer());

c.Dispose();
}
}

public class TestConnectionMemoryLeaks : TestSuite<ConnectionMemoryLeaksSuiteContext>
Expand Down