Skip to content

Commit

Permalink
defer sub cleanup to avoid the lock inversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Radu Popovici committed Jul 29, 2022
1 parent 40e60a9 commit 3e2002b
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs
sc.subMap[sub.inbox] = sub
sc.Unlock()

doClean := true
defer func() {
if doClean {
sc.Lock()
//Un-register subscription.
delete(sc.subMap, sub.inbox)
sc.Unlock()
}
}()

// Hold lock throughout.
sub.Lock()
defer sub.Unlock()
Expand All @@ -262,9 +272,6 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs
// Listen for actual messages.
nsub, err := sc.nc.Subscribe(sub.inbox, sc.processMsg)
if err != nil {
sc.Lock()
delete(sc.subMap, sub.inbox)
sc.Unlock()
return nil, err
}
nsub.SetPendingLimits(-1, -1)
Expand Down Expand Up @@ -315,9 +322,6 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs
// Report this error to the user.
err = ErrSubReqTimeout
}
sc.Lock()
delete(sc.subMap, sub.inbox)
sc.Unlock()
return nil, err
}
r := &pb.SubscriptionResponse{}
Expand All @@ -331,6 +335,9 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs
}
sub.ackInbox = r.AckInbox

// Prevent cleanup on exit.
doClean = false

return sub, nil
}

Expand Down

0 comments on commit 3e2002b

Please sign in to comment.