Skip to content

Commit

Permalink
pubsub and topic methods now return error if the pubsub context has b…
Browse files Browse the repository at this point in the history
…een cancelled instead of hanging
  • Loading branch information
aschmahmann committed Nov 3, 2019
1 parent ad97d9b commit 2285cb5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
35 changes: 29 additions & 6 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,13 @@ func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) {
}

resp := make(chan *Topic, 1)
t.p.addTopic <- &addTopicReq{
select {
case t.p.addTopic <- &addTopicReq{
topic: t,
resp: resp,
}:
case <-t.p.ctx.Done():
return nil, false, t.p.ctx.Err()
}
returnedTopic := <-resp

Expand Down Expand Up @@ -848,7 +852,11 @@ type topicReq struct {
// GetTopics returns the topics this node is subscribed to.
func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1)
p.getTopics <- &topicReq{resp: out}
select {
case p.getTopics <- &topicReq{resp: out}:
case <-p.ctx.Done():
return []string{}
}
return <-out
}

Expand Down Expand Up @@ -880,16 +888,23 @@ type listPeerReq struct {
// ListPeers returns a list of peers we are connected to in the given topic.
func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
p.getPeers <- &listPeerReq{
select {
case p.getPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return []peer.ID{}
}
return <-out
}

// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
p.blacklistPeer <- pid
select {
case p.blacklistPeer <- pid:
case <-p.ctx.Done():
}
}

// RegisterTopicValidator registers a validator for topic.
Expand All @@ -910,7 +925,11 @@ func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...Val
}
}

p.addVal <- addVal
select {
case p.addVal <- addVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-addVal.resp
}

Expand All @@ -922,6 +941,10 @@ func (p *PubSub) UnregisterTopicValidator(topic string) error {
resp: make(chan error, 1),
}

p.rmVal <- rmVal
select {
case p.rmVal <- rmVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-rmVal.resp
}
27 changes: 23 additions & 4 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler,
}

done := make(chan struct{}, 1)
t.p.eval <- func() {

select {
case t.p.eval <- func() {
tmap := t.p.topics[t.topic]
for p := range tmap {
h.evtLog[p] = PeerJoin
Expand All @@ -61,6 +63,9 @@ func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler,
t.evtHandlers[h] = struct{}{}
t.evtHandlerMux.Unlock()
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}

<-done
Expand Down Expand Up @@ -104,9 +109,13 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {

t.p.disc.Discover(sub.topic)

t.p.addSub <- &addSubReq{
select {
case t.p.addSub <- &addSubReq{
sub: sub,
resp: out,
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}

return <-out, nil
Expand Down Expand Up @@ -157,7 +166,11 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
}

t.p.publish <- &Message{m, id}
select {
case t.p.publish <- &Message{m, id}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}

return nil
}
Expand All @@ -181,7 +194,13 @@ func (t *Topic) Close() error {
}

req := &rmTopicReq{t, make(chan error, 1)}
t.p.rmTopic <- req

select {
case t.p.rmTopic <- req:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}

err := <-req.resp

if err == nil {
Expand Down

0 comments on commit 2285cb5

Please sign in to comment.