Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Conn.Barrier() API #338

Merged
merged 1 commit into from
Jan 19, 2018
Merged

[ADDED] Conn.Barrier() API #338

merged 1 commit into from
Jan 19, 2018

Conversation

kozlovic
Copy link
Member

Suppose that you have a connection with two subscriptions, let's
call them "pub" and "close".
Let's say that you know that when receiving a message on "close",
no new message will arrive on "pub" (because the sender does
send "pub" and then a final message on "close"). But because
"pub" and "close" are two different subscriptions, there is no
guarantee that the message on "close" is not going to be dispatched
before the last message on "pub".

The Barrier() API can be used to solve this issue. In the example
above, if the message callback on "close" were to call that API,
this will ensure that all pending messages on "pub" have been
dipatched before the provided function in the Barrier() API gets
executed.

See test/TestBarrier() test for more details.

Suppose that you have a connection with two subscriptions, let's
call them "pub" and "close".
Let's say that you know that when receiving a message on "close",
no new message will arrive on "pub" (because the sender does
send "pub" and then a final message on "close"). But because
"pub" and "close" are two different subscriptions, there is no
guarantee that the message on "close" is not going to be dispatched
before the last message on "pub".

The Barrier() API can be used to solve this issue. In the example
above, if the message callback on "close" were to call that API,
this will ensure that all pending messages on "pub" have been
dipatched before the provided function in the Barrier() API gets
executed.

See `test/TestBarrier()` test for more details.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 94.639% when pulling 3215d38 on add_barrier_api into d66cb54 on master.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good, a few comments. Question, could I not do this using channels where I could add those subscriptions such that they share a channel and a Go routine? Or set of Go routines depending on how strict you want the ordering as per invoke a function vs complete it or guarantee they are last.

nc.subsMu.Lock()
defer nc.subsMu.Unlock()
// Need to figure out how many non chan subscriptions there are
numSubs := 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For small numbers should be fine, but should we consider tracking a variable on nc? nc.numAsyncSubs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather leave this for now, avoiding any issue with discrepancies between +1 and -1 when removing a subscription.

barrier := &barrierInfo{refs: int64(numSubs), f: f}
for _, sub := range nc.subs {
sub.mu.Lock()
if sub.mch == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be consistent with above when counting, but if its not, does f() never get called or called too early? Should we care?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If barrier is address, could we just count in place here and add it to the barrier struct after this iteration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment 1: the nc.subsMutex is held for the duration of this call so no subscription can be removed after counting it.
comment 2: no because we still do individual sub locking, so once we post the barrier message, it is possible that a subscription processes it (after releasing the sub.mu lock here) before we have post them all. The dispatcher will possibly decrement below 0 or fire too soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstood comment 1. I do not count sync and chan subscriptions because we don't have real control on when messages get dequeued (for sure for chan subscription, I would have no place to check for that special message).

@kozlovic
Copy link
Member Author

@derekcollison Not sure if I follow, but if the idea would be to use single go channel to use for several ChanSubscriptions it means that I would have a single go routine dequeueing and that routine would have to check message subject, then invoke corresponding function for processing. This limits the throughput. It is nice to have the ability to dispatch concurrently when we can, but have a way to have a barrier when we need.

@derekcollison
Copy link
Member

You could queue the closed over f() with message etc. and dispatch to a group of Go routines.

@derekcollison
Copy link
Member

LGTM

@kozlovic kozlovic merged commit 78ec4b9 into master Jan 19, 2018
@kozlovic kozlovic deleted the add_barrier_api branch January 19, 2018 02:35
kozlovic added a commit that referenced this pull request Mar 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants