Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic handler bug fixes #225

Merged
merged 3 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...
return d.server.FindPeers(ns, options.Limit)
}

type dummyDiscovery struct{}

func (d *dummyDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return time.Hour, nil
}

func (d *dummyDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
retCh := make(chan peer.AddrInfo)
go func() {
time.Sleep(time.Second)
close(retCh)
}()
return retCh, nil
}

func TestSimpleDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
35 changes: 29 additions & 6 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,13 @@ func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) {
}

resp := make(chan *Topic, 1)
t.p.addTopic <- &addTopicReq{
select {
case t.p.addTopic <- &addTopicReq{
topic: t,
resp: resp,
}:
case <-t.p.ctx.Done():
return nil, false, t.p.ctx.Err()
}
returnedTopic := <-resp

Expand Down Expand Up @@ -848,7 +852,11 @@ type topicReq struct {
// GetTopics returns the topics this node is subscribed to.
func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1)
p.getTopics <- &topicReq{resp: out}
select {
case p.getTopics <- &topicReq{resp: out}:
case <-p.ctx.Done():
return nil
}
return <-out
}

Expand Down Expand Up @@ -880,16 +888,23 @@ type listPeerReq struct {
// ListPeers returns a list of peers we are connected to in the given topic.
func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
p.getPeers <- &listPeerReq{
select {
case p.getPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return nil
}
return <-out
}

// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
p.blacklistPeer <- pid
select {
case p.blacklistPeer <- pid:
case <-p.ctx.Done():
}
}

// RegisterTopicValidator registers a validator for topic.
Expand All @@ -910,7 +925,11 @@ func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...Val
}
}

p.addVal <- addVal
select {
case p.addVal <- addVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-addVal.resp
}

Expand All @@ -922,6 +941,10 @@ func (p *PubSub) UnregisterTopicValidator(topic string) error {
resp: make(chan error, 1),
}

p.rmVal <- rmVal
select {
case p.rmVal <- rmVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-rmVal.resp
}
75 changes: 69 additions & 6 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"errors"
"fmt"
"sync"

Expand All @@ -10,20 +11,33 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
topic string

evtHandlerMux sync.RWMutex
evtHandlers map[*TopicEventHandler]struct{}

mux sync.RWMutex
closed bool
}

// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}

h := &TopicEventHandler{
err: nil,
topic: t,
err: nil,

evtLog: make(map[peer.ID]EventType),
evtLogCh: make(chan struct{}, 1),
Expand All @@ -37,7 +51,9 @@ func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler,
}

done := make(chan struct{}, 1)
t.p.eval <- func() {

select {
case t.p.eval <- func() {
tmap := t.p.topics[t.topic]
for p := range tmap {
h.evtLog[p] = PeerJoin
Expand All @@ -47,6 +63,9 @@ func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler,
t.evtHandlers[h] = struct{}{}
t.evtHandlerMux.Unlock()
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}

<-done
Expand All @@ -67,6 +86,12 @@ func (t *Topic) sendNotification(evt PeerEvent) {
// Note that subscription is not an instanteneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}

sub := &Subscription{
topic: t.topic,
ch: make(chan *Message, 32),
Expand All @@ -84,9 +109,13 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {

t.p.disc.Discover(sub.topic)

t.p.addSub <- &addSubReq{
select {
case t.p.addSub <- &addSubReq{
sub: sub,
resp: out,
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}

return <-out, nil
Expand All @@ -103,6 +132,12 @@ type PubOpt func(pub *PublishOptions) error

// Publish publishes data to topic.
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return ErrTopicClosed
}

seqno := t.p.nextSeqno()
id := t.p.host.ID()
m := &pb.Message{
Expand Down Expand Up @@ -131,7 +166,11 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
}

t.p.publish <- &Message{m, id}
select {
case t.p.publish <- &Message{m, id}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}

return nil
}
Expand All @@ -148,13 +187,37 @@ func WithReadiness(ready RouterReady) PubOpt {
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
return nil
}

req := &rmTopicReq{t, make(chan error, 1)}
t.p.rmTopic <- req
return <-req.resp

select {
case t.p.rmTopic <- req:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}

err := <-req.resp

if err == nil {
t.closed = true
}

return err
}

// ListPeers returns a list of peers we are connected to in the given topic.
func (t *Topic) ListPeers() []peer.ID {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return []peer.ID{}
}

return t.p.ListPeers(t.topic)
}

Expand Down
Loading