Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JetStream Reply Subjects #365

Merged
merged 7 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 58 additions & 22 deletions src/NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ public Options Opts
// used to map replies to requests from client (should lock)
private long nextRequestId = 0;

private readonly ConcurrentDictionary<string, InFlightRequest> waitingRequests
= new ConcurrentDictionary<string, InFlightRequest>(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, InFlightRequest> waitingRequests
= new Dictionary<string, InFlightRequest>(StringComparer.OrdinalIgnoreCase);

// Prepare protocol messages for efficiency
private byte[] PING_P_BYTES = null;
Expand Down Expand Up @@ -2592,24 +2592,55 @@ public void Publish(string subject, string reply, byte[] data, int offset, int c

protected Msg request(string subject, byte[] data, int offset, int count, int timeout) => requestSync(subject, data, offset, count, timeout);

private void removeOutstandingRequest(string requestId) => waitingRequests.TryRemove(requestId, out _);
private void RemoveOutstandingRequest(string requestId)
{
lock (mu)
{
waitingRequests.Remove(requestId);
}
}

private void requestResponseHandler(object sender, MsgHandlerEventArgs e)
private void RequestResponseHandler(object sender, MsgHandlerEventArgs e)
{
if (e.Message == null)
return;
InFlightRequest request;
bool isClosed;

// \
// \/
// _INBOX.<nuid>.<requestId>
var requestId = e.Message.Subject.Substring(globalRequestInbox.Length + 1);
if (!waitingRequests.TryGetValue(requestId, out var request))
if (e.Message == null)
return;

bool isClosed;
var subject = e.Message.Subject;

lock (mu)
{
// if it's a typical response, process normally.
if (subject.StartsWith(globalRequestInbox))
{
// \
// \/
// _INBOX.<nuid>.<requestId>
var requestId = subject.Substring(globalRequestInbox.Length + 1);
if (!waitingRequests.TryGetValue(requestId, out request))
return;
}
else
{
// We have a jetstream subject (remapped), so if there's only one
// request assume we're OK and handle it.
if (waitingRequests.Count == 1)
{
InFlightRequest[] values = new InFlightRequest[1];
waitingRequests.Values.CopyTo(values, 0);
request = values[0];

}
else
{
// if we get here, we have multiple outsanding jetstream
// requests. We can't tell which is which we'll punt.
return;
}
}

isClosed = this.isClosed();
}

Expand All @@ -2630,18 +2661,21 @@ private InFlightRequest setupRequest(int timeout, CancellationToken token)
if (requestId < 0) //Check if recycled
requestId = (requestId + long.MaxValue + 1);

var request = new InFlightRequest(requestId.ToString(CultureInfo.InvariantCulture), token, timeout, removeOutstandingRequest);
var request = new InFlightRequest(requestId.ToString(CultureInfo.InvariantCulture), token, timeout, RemoveOutstandingRequest);
request.Waiter.Task.ContinueWith(t => GC.KeepAlive(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
waitingRequests.TryAdd(request.Id, request);

if (globalRequestSubscription != null)
return request;

lock (mu)
{
// We shouldn't ever get an Argument exception because the ID is incrementing
// and since this is performant sensitive code, skipping an existence check.
waitingRequests.Add(request.Id, request);

if (globalRequestSubscription != null)
return request;

if (globalRequestSubscription == null)
globalRequestSubscription = subscribeAsync(string.Concat(globalRequestInbox, ".*"), null,
requestResponseHandler);
RequestResponseHandler);
}

return request;
Expand Down Expand Up @@ -3617,12 +3651,14 @@ private void clearPendingFlushCalls()
// Caller must lock
private void clearPendingRequestCalls()
{
foreach (var request in waitingRequests)
lock (mu)
{
request.Value.Waiter.TrySetCanceled();
foreach (var request in waitingRequests)
{
request.Value.Waiter.TrySetCanceled();
}
waitingRequests.Clear();
}

waitingRequests.Clear();
}


Expand Down
70 changes: 58 additions & 12 deletions src/Tests/IntegrationTests/TestBasic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public void TestRequestTimeout()
{
using (IConnection c = Context.OpenConnection())
{
Assert.Throws<NATSTimeoutException>(() => c.Request("foo", null, 50));
Assert.Throws<NATSTimeoutException>(() => c.Request("foo", null, 50));
}

Options opts = Context.GetTestOptions();
Expand Down Expand Up @@ -1415,7 +1415,7 @@ public void TestUrlArgument()
c.Close();
}

using(var c = Context.ConnectionFactory.CreateConnection(url1))
using (var c = Context.ConnectionFactory.CreateConnection(url1))
{
c.Close();
}
Expand Down Expand Up @@ -1543,7 +1543,7 @@ public void TestAsyncInfoProtocolPrune()
opts.Url = $"nats://127.0.0.1:{Context.Server1.Port}";

AutoResetEvent evDS = new AutoResetEvent(false);
opts.ServerDiscoveredEventHandler = (o, a) =>{evDS.Set();};
opts.ServerDiscoveredEventHandler = (o, a) => { evDS.Set(); };

AutoResetEvent evRC = new AutoResetEvent(false);
opts.ReconnectedEventHandler = (o, a) => { evRC.Set(); };
Expand Down Expand Up @@ -1609,13 +1609,13 @@ private bool listsEqual(string[] l1, string[] l2)
[Fact]
public void TestServersRandomize()
{
var serverList = new [] {
var serverList = new[] {
Context.DefaultServer.Url,
"nats://localhost:2",
"nats://localhost:3",
"nats://localhost:4",
"nats://localhsot:5",
"nats://localhost:6",
"nats://localhost:4",
"nats://localhsot:5",
"nats://localhost:6",
"nats://localhost:7"
};

Expand Down Expand Up @@ -1681,26 +1681,26 @@ public void TestSimpleUrlArgument()

// simple url
o.Url = "127.0.0.1";
using(Context.ConnectionFactory.CreateConnection(o)){}
using (Context.ConnectionFactory.CreateConnection(o)) { }

// servers with a simple hostname
o.Url = null;
o.Servers = new string[] { "127.0.0.1" };
using(var cn = Context.ConnectionFactory.CreateConnection(o))
using (var cn = Context.ConnectionFactory.CreateConnection(o))
cn.Close();

// simple url connect
using(var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost"))
using (var cn = Context.ConnectionFactory.CreateConnection("127.0.0.1, localhost"))
cn.Close();

// url with multiple hosts
o.Url = "127.0.0.1,localhost";
using (Context.ConnectionFactory.CreateConnection(o)) {}
using (Context.ConnectionFactory.CreateConnection(o)) { }

// servers with multiple hosts
o.Url = null;
o.Servers = new string[] { "127.0.0.1", "localhost" };
using(var cn = Context.ConnectionFactory.CreateConnection(o))
using (var cn = Context.ConnectionFactory.CreateConnection(o))
cn.Close();
}
}
Expand Down Expand Up @@ -1737,5 +1737,51 @@ public void TestServersOption()
Assert.ThrowsAny<NATSNoServersException>(() => Context.ConnectionFactory.CreateConnection());
}

/// <summary>
/// Temporary test for upcoming jetstream functionality. The jetstream server must be run
/// manually and loaded per the issue listed below. JetStream will remap reply subjects
/// for requests, so additional code had to be added to handle this. Future jetstream tests
/// will cover this and this test should be replaced.
///
/// # Start server with jetstream enabled.
/// $ ./nats-server -js
///
/// Create the stream
/// $ jsm str create
/// ? Stream Name foo
/// ? Subjects to consume foo.*
/// ? Storage backend memory
/// ? Retention Policy Limits
/// ? Message count limit -1
/// ? Message size limit -1
/// ? Maximum message age limit -1
/// ? Maximum individual message size -1
///
/// Create the server side consumer
/// $ jsm con create
/// ? Select a Stream foo
/// ? Consumer name bar
/// ? Delivery target
/// ? Start policy (all, last, 1h, msg sequence) all
/// ? Filter Stream by subject (blank for all)
/// ? Maximum Allowed Deliveries -1
///
/// Now manually run this test.
///
/// </summary>
[Fact(Skip = "Manual")]
public void TestJetstreamSubjectHandling()
{
// Start a NATS server with experimental -js feature and create a stream and consumer
// as described in issue https://github.com/nats-io/nats.net/issues/364

// publish a message into JS
using (var c = new ConnectionFactory().CreateConnection())
{
c.Publish("foo.inc", Encoding.UTF8.GetBytes("hello"));
c.Request("$JS.STREAM.foo.CONSUMER.bar.NEXT", Encoding.ASCII.GetBytes("1"), 1000);
}
}

} // class
} // namespace
1 change: 1 addition & 0 deletions src/Tests/IntegrationTests/TestReconnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public void TestQueueSubsOnReconnect()
{
AutoResetEvent reconnectEvent = new AutoResetEvent(false);
Options opts = getReconnectOptions();
opts.MaxReconnect = 32;

string subj = "foo.bar";
string qgroup = "workers";
Expand Down