-
Notifications
You must be signed in to change notification settings - Fork 700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix new-style request leak #299
Conversation
The new request style was leaking channels in the respMap when requests timed out. Now we remove them from the map when the request times out. This also simplifies the respMux setup and prevents multiple resp subscriptions from being created on the server.
So of course I looked at that as well, and coded it up that way first but realized I was using the Once mutex for creation and then the Conn's mutex for accessing etc. So thought it would show up as a race. |
nats.go
Outdated
// Make sure scoped subscription is setup only once on first call to | ||
// Request(). | ||
var err error | ||
nc.respSetup.Do(func() { err = nc.createRespMux() }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All calls will block until createRespMux()
returns, right? That is, say "thread" 1 is first to go, is in the middle of executing createRespMux()
and a second "thread" executes nc.respSetup.Do()
, then the call will block, right? If not, then the code would not be correct.
The only drawback with this (and previous) approach is that we lock nc, use now this lock (I am sure it is implemented with locking/atomic), then again lock nc to create the respInbox. In the C client, I initialize map and respInbox in the lock above, then only when we need to create the sub there is extra locking required. So after initial creation of subscription, I require only 1 lock, here it is 3 per request. That's fine, we can iterate again later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Once struct and hence the Do has both a mutex and atomic for fast path evaluation for when the function has already been called. But this again is mixing and matching the Once mutex with the Connection mutex. But that may be ok in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with reverting the sync.Once
change if we don't want it. The important thing in this PR is the map cleanup on request timeout, though I don't like the current race where multiple resp subscriptions can get created and then are subsequently unsubscribed, which this change prevents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do with think their is a race using different locks to protect the same structure fields? Should we try a test that spins up some Go routines and then they all try to subscribe at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially. I will write a test and see if I can trigger a race. Otherwise, we might just be better off doing what Ivan did in cnats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sync.Once() is great, but we could leave the use of it only when we detect that the respMux is not yet created. So it could be a mix of what we do in c-nats and use of sync.Once.
That is, when first grabbing nc' mutex, we could setup the respMap, etc.. if not yet initialized, and prepare the respInbox and put it in the map, then snapshot if we need to create the subsription: createSub := nc.respMux == nil
. When releasing the nc's mutex, check createSub
and invoke sync.Once, but this time just to create the subscription (not the map, which would already be done), then in the function, setup respMux under nc.mu protection please.
Again, in this case that means that we would grab nc.mu only once per request, except at initialization time where we may call sync.Once from multiple go-routines (if there are parallel Requests).
(Alternatively, or if I am not making sense, you could simply fix the map leak on timeout condition - which I had addressed in the c client and wanted to report back but did not have time, and then have I could submit a PR for the rest if it makes it easier to understand - and see if what I say actually works :-)).
nats.go
Outdated
defer s.Unsubscribe() | ||
} | ||
nc.mu.Unlock() | ||
nc.respSub = ginbox |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have locking around this? What if the connection is closed from another go-routine?
nats.go
Outdated
// _INBOX wildcard | ||
ginbox := fmt.Sprintf("%s.*", NewInbox()) | ||
nc.mu.Lock() | ||
ginbox := nc.respSub |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this tmp variable now it seems.
@kozlovic implemented what you suggested. I realized there is a lot of duplication between |
nats.go
Outdated
ginbox := nc.respSub | ||
nc.mu.Unlock() | ||
s, err := nc.Subscribe(ginbox, nc.respHandler) | ||
s, err := nc.Subscribe(nc.respSub, nc.respHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, I now see that you were snapshotting the subject from the *Conn struct, which is probably needed since now line 2002 accesses it without a lock.
if nc.respMap == nil { | ||
// _INBOX wildcard | ||
nc.respSub = fmt.Sprintf("%s.*", NewInbox()) | ||
nc.respMap = make(map[string]chan *Msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would then snapshot respSub here (like wsub := nc.respSub
) and the pass it to nc.createRespMux()
.
return nil, err | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to remove from map on timeout here too?
a67374a
to
301a5b1
Compare
The new request style was leaking channels in the respMap when requests
timed out. Now we remove them from the map when the request times out.
This also simplifies the respMux setup and prevents multiple resp
subscriptions from being created on the server.
@nats-io/core