-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix JetStream Reply Subjects #365
Conversation
Signed-off-by: Colin Sullivan <[email protected]>
Signed-off-by: Colin Sullivan <[email protected]>
Signed-off-by: Colin Sullivan <[email protected]>
src/NATS.Client/Conn.cs
Outdated
{ | ||
// We have a jetstream subject (remapped), so if there's only one | ||
// request assume we're OK and handle it. | ||
if (waitingRequests.Count == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not locked under mu
here like in the go client's code, so I don't think this section is thread safe anymore:
https://github.com/nats-io/nats.go/blob/93a68d7e795f11c0aa30cfa38cf9a4702ae8d8b7/nats.go#L2630-L2670
I need to read up more on JetStream to have a better idea as to why only one request would be "out" with this model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks. I'll fix that.
With JetStream, you make a request to a "consumer" of a stream. JetStream allows for the aggregation of multiple subjects (including wildcards), so it re-writes the subject (overriding the request's reply subject) to have the subject name of the original message. A good place to start with JetStream right now is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading that it appears we would need our consumer to tell us it is a JetStream consumer, so we could set it up properly in the waiting requests dictionary. Based on the golang code it seems that if you use a NATS connection for JetStream, you must ONLY use it for JetStream (given the map count logic). Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that - JetStream is a standard NATS server with the JetStream subsystem enabled, so subscriptions on a connection can receive both persistent messages from a JetStream server side consumer and messages from standard producers. On the same connection, one could make request to a service listening to "foo", in which case I'd get back a message on the inbox + request id reply subject originally used. One could also make a request to a JetStream consumer (we used to call it observable), which would send the client a message using the original subject as the subject. So if I've setup JetStream to aggregate subjects "foo" and "bar" under a "baz" JetStream server consumer, I'd make a request to "baz" and the subject could be overridden by the NATS server to be "foo", to provide the original subject, instead of the original reply subject we gave the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're using async request-reply, why couldn't we have a situation where we'd have a pending request from a non-JS source and a JS sourced request at the same time? e.g. waitingRequests.Count > 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could and so just make a best effort attempt there. Since we've lost information in the system (the reply subject of the original message) we cannot accurately determine which received message should correlate to which request we're tracking. It's not ideal, but this lets the application retry.
Hmm... this situation would create a memory leak with ghost jetstream requests.
I suppose in this case we could search the outstanding requests to check if there's only one jetstream request to be a little better about this.
@kozlovic wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the code (even in Go) is confusing. There is too many assumptions and even if you don't mix non-JS and JS requests, as long as you have more than one request it would not work, which is not reasonable. I think this needs to be discussed and revisited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case let's keep the current code and if go is updated we'll make appropriate changes to the other clients (including this one). Thanks @kozlovic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@watfordgnf, are we good to go?
Signed-off-by: Colin Sullivan <[email protected]>
Signed-off-by: Colin Sullivan <[email protected]>
src/NATS.Client/Conn.cs
Outdated
@@ -2592,47 +2592,46 @@ public void Publish(string subject, string reply, byte[] data, int offset, int c | |||
|
|||
protected Msg request(string subject, byte[] data, int offset, int count, int timeout) => requestSync(subject, data, offset, count, timeout); | |||
|
|||
private void removeOutstandingRequest(string requestId) => waitingRequests.TryRemove(requestId, out _); | |||
private void RemoveOutstandingRequest(string requestId) => waitingRequests.TryRemove(requestId, out _); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would have to be done while holding mu
as well as the addition to waitingRequests
in setupRequest
. At that point it wouldn't be advantageous to use a ConcurrentDictionary
as we'd be handling its concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, and ConcurrentDictionary is causing problems in Unity right now. I'll give it a shot.
Signed-off-by: Colin Sullivan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. (Granted it may not be the correct approach, but that's what the Go client is doing at the moment. May need to be revisited later).
JetStream will remap reply subjects for requests. The request handling code needed to be updated to handle this. This mirrors the go client's logic.
Resolves #364.
Signed-off-by: Colin Sullivan [email protected]