Skip to content

Commit

Permalink
Merge pull request #201 from lukesolo/subscription-fix-cancel
Browse files Browse the repository at this point in the history
add PubSub's context to Subscription
  • Loading branch information
vyzo authored Oct 1, 2019
2 parents 9f04364 + 9d03237 commit dba8299
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
1 change: 1 addition & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO

sub := &Subscription{
topic: td.GetName(),
ctx: p.ctx,

ch: make(chan *Message, 32),
peerEvtCh: make(chan PeerEvent, 1),
Expand Down
6 changes: 5 additions & 1 deletion subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Subscription struct {
ch chan *Message
cancelCh chan<- *Subscription
err error
ctx context.Context

peerEvtCh chan PeerEvent
evtLogMx sync.Mutex
Expand Down Expand Up @@ -49,7 +50,10 @@ func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
}

func (sub *Subscription) Cancel() {
sub.cancelCh <- sub
select {
case sub.cancelCh <- sub:
case <-sub.ctx.Done():
}
}

func (sub *Subscription) close() {
Expand Down

0 comments on commit dba8299

Please sign in to comment.