diff --git a/pubsub.go b/pubsub.go index 5902c00a..4635b388 100644 --- a/pubsub.go +++ b/pubsub.go @@ -696,6 +696,7 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO sub := &Subscription{ topic: td.GetName(), + ctx: p.ctx, ch: make(chan *Message, 32), peerEvtCh: make(chan PeerEvent, 1), diff --git a/subscription.go b/subscription.go index 45d957ec..b3ddf836 100644 --- a/subscription.go +++ b/subscription.go @@ -18,6 +18,7 @@ type Subscription struct { ch chan *Message cancelCh chan<- *Subscription err error + ctx context.Context peerEvtCh chan PeerEvent evtLogMx sync.Mutex @@ -49,7 +50,10 @@ func (sub *Subscription) Next(ctx context.Context) (*Message, error) { } func (sub *Subscription) Cancel() { - sub.cancelCh <- sub + select { + case sub.cancelCh <- sub: + case <-sub.ctx.Done(): + } } func (sub *Subscription) close() {