-
Notifications
You must be signed in to change notification settings - Fork 698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADDED] Conn.Barrier() API #338
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"strconv" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/nats-io/go-nats/util" | ||
|
@@ -351,6 +352,12 @@ type Msg struct { | |
Data []byte | ||
Sub *Subscription | ||
next *Msg | ||
barrier *barrierInfo | ||
} | ||
|
||
type barrierInfo struct { | ||
refs int64 | ||
f func() | ||
} | ||
|
||
// Tracks various stats received and sent on this connection, | ||
|
@@ -1571,6 +1578,13 @@ func (nc *Conn) waitForMsgs(s *Subscription) { | |
if s.pHead == nil { | ||
s.pTail = nil | ||
} | ||
if m.barrier != nil { | ||
s.mu.Unlock() | ||
if atomic.AddInt64(&m.barrier.refs, -1) == 0 { | ||
m.barrier.f() | ||
} | ||
continue | ||
} | ||
s.pMsgs-- | ||
s.pBytes -= len(m.Data) | ||
} | ||
|
@@ -1599,6 +1613,19 @@ func (nc *Conn) waitForMsgs(s *Subscription) { | |
break | ||
} | ||
} | ||
// Check for barrier messages | ||
s.mu.Lock() | ||
for m := s.pHead; m != nil; m = s.pHead { | ||
if m.barrier != nil { | ||
s.mu.Unlock() | ||
if atomic.AddInt64(&m.barrier.refs, -1) == 0 { | ||
m.barrier.f() | ||
} | ||
s.mu.Lock() | ||
} | ||
s.pHead = m.next | ||
} | ||
s.mu.Unlock() | ||
} | ||
|
||
// processMsg is called by parse and will place the msg on the | ||
|
@@ -3006,3 +3033,48 @@ func (nc *Conn) TLSRequired() bool { | |
defer nc.mu.Unlock() | ||
return nc.info.TLSRequired | ||
} | ||
|
||
// Barrier schedules the given function `f` to all registered asynchronous | ||
// subscriptions. | ||
// Only the last subscription to see this barrier will invoke the function. | ||
// If no subscription is registered at the time of this call, `f()` is invoked | ||
// right away. | ||
// ErrConnectionClosed is returned if the connection is closed prior to | ||
// the call. | ||
func (nc *Conn) Barrier(f func()) error { | ||
nc.mu.Lock() | ||
defer nc.mu.Unlock() | ||
if nc.isClosed() { | ||
return ErrConnectionClosed | ||
} | ||
nc.subsMu.Lock() | ||
defer nc.subsMu.Unlock() | ||
// Need to figure out how many non chan subscriptions there are | ||
numSubs := 0 | ||
for _, sub := range nc.subs { | ||
if sub.typ == AsyncSubscription { | ||
numSubs++ | ||
} | ||
} | ||
if numSubs == 0 { | ||
f() | ||
return nil | ||
} | ||
barrier := &barrierInfo{refs: int64(numSubs), f: f} | ||
for _, sub := range nc.subs { | ||
sub.mu.Lock() | ||
if sub.mch == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be consistent with above when counting, but if its not, does f() never get called or called too early? Should we care? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If barrier is address, could we just count in place here and add it to the barrier struct after this iteration? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment 1: the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I misunderstood comment 1. I do not count sync and chan subscriptions because we don't have real control on when messages get dequeued (for sure for chan subscription, I would have no place to check for that special message). |
||
msg := &Msg{barrier: barrier} | ||
// Push onto the async pList | ||
if sub.pTail != nil { | ||
sub.pTail.next = msg | ||
} else { | ||
sub.pHead = msg | ||
sub.pCond.Signal() | ||
} | ||
sub.pTail = msg | ||
} | ||
sub.mu.Unlock() | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For small numbers should be fine, but should we consider tracking a variable on nc? nc.numAsyncSubs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather leave this for now, avoiding any issue with discrepancies between +1 and -1 when removing a subscription.