Skip to content

Commit

Permalink
fixed closed Topic handles still being able to perform some actions o…
Browse files Browse the repository at this point in the history
…n the topic
  • Loading branch information
aschmahmann committed Nov 1, 2019
1 parent 899f9cd commit c57bc57
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 1 deletion.
15 changes: 15 additions & 0 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...
return d.server.FindPeers(ns, options.Limit)
}

type dummyDiscovery struct {}

func (d *dummyDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return time.Hour, nil
}

func (d *dummyDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
retCh := make(chan peer.AddrInfo)
go func (){
time.Sleep(time.Second)
close(retCh)
}()
return retCh, nil
}

func TestSimpleDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
45 changes: 44 additions & 1 deletion topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"errors"
"fmt"
"sync"

Expand All @@ -10,18 +11,30 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// errTopicClosed is returned if a Topic is utilized after it has been closed
var errTopicClosed = errors.New("this Topic is closed, try opening a new one")

// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
topic string

evtHandlerMux sync.RWMutex
evtHandlers map[*TopicEventHandler]struct{}

mux sync.RWMutex
closed bool
}

// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, errTopicClosed
}

h := &TopicEventHandler{
topic: t,
err: nil,
Expand Down Expand Up @@ -68,6 +81,12 @@ func (t *Topic) sendNotification(evt PeerEvent) {
// Note that subscription is not an instanteneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, errTopicClosed
}

sub := &Subscription{
topic: t.topic,
ch: make(chan *Message, 32),
Expand Down Expand Up @@ -104,6 +123,12 @@ type PubOpt func(pub *PublishOptions) error

// Publish publishes data to topic.
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return errTopicClosed
}

seqno := t.p.nextSeqno()
id := t.p.host.ID()
m := &pb.Message{
Expand Down Expand Up @@ -149,13 +174,31 @@ func WithReadiness(ready RouterReady) PubOpt {
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
return nil
}

req := &rmTopicReq{t, make(chan error, 1)}
t.p.rmTopic <- req
return <-req.resp
err := <-req.resp

if err == nil {
t.closed = true
}

return err
}

// ListPeers returns a list of peers we are connected to in the given topic.
func (t *Topic) ListPeers() []peer.ID {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return []peer.ID{}
}

return t.p.ListPeers(t.topic)
}

Expand Down
95 changes: 95 additions & 0 deletions topic_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pubsub

import (
"bytes"
"context"
"fmt"
"sync"
Expand Down Expand Up @@ -110,6 +111,100 @@ func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic
}
}

func TestTopicReuse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numHosts = 2
topicID := "foobar"
hosts := getNetHosts(t, ctx, numHosts)

sender := getPubsub(ctx, hosts[0], WithDiscovery(&dummyDiscovery{}))
receiver := getPubsub(ctx, hosts[1])

connectAll(t, hosts)

// Sender creates topic
sendTopic, err := sender.Join(topicID)
if err != nil {
t.Fatal(err)
}

// Receiver creates and subscribes to the topic
receiveTopic, err := receiver.Join(topicID)
if err != nil {
t.Fatal(err)
}

sub, err := receiveTopic.Subscribe()
if err != nil {
t.Fatal(err)
}

firstMsg := []byte("1")
if err := sendTopic.Publish(ctx, firstMsg, WithReadiness(MinTopicSize(1))); err != nil {
t.Fatal(err)
}

msg, err := sub.Next(ctx)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(msg.GetData(), firstMsg) != 0 {
t.Fatal("received incorrect message")
}

if err := sendTopic.Close(); err != nil {
t.Fatal(err)
}

// Recreate the same topic
newSendTopic, err := sender.Join(topicID)
if err != nil {
t.Fatal(err)
}

// Try sending data with original topic
illegalSend := []byte("illegal")
if err := sendTopic.Publish(ctx, illegalSend); err != errTopicClosed {
t.Fatal(err)
}

timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second * 2)
defer timeoutCancel()
msg, err = sub.Next(timeoutCtx)
if err != context.DeadlineExceeded {
if err != nil {
t.Fatal(err)
}
if bytes.Compare(msg.GetData(), illegalSend) != 0 {
t.Fatal("received incorrect message from illegal topic")
}
t.Fatal("received message sent by illegal topic")
}
timeoutCancel()

// Try cancelling the new topic by using the original topic
if err := sendTopic.Close(); err != nil {
t.Fatal(err)
}

secondMsg := []byte("2")
if err := newSendTopic.Publish(ctx, secondMsg); err != nil {
t.Fatal(err)
}

timeoutCtx, timeoutCancel = context.WithTimeout(ctx, time.Second * 2)
defer timeoutCancel()
msg, err = sub.Next(ctx)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(msg.GetData(), secondMsg) != 0 {
t.Fatal("received incorrect message")
}
}

func TestTopicEventHandlerCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit c57bc57

Please sign in to comment.