From 04146f5f3c98bffd1a3c893a46ade29b4a18f40f Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 13 Jan 2018 20:23:18 +0200 Subject: [PATCH] revert NewFloodSub interface change, use pointers for sendReq address self-review comments. --- floodsub.go | 21 ++++++--------------- floodsub_test.go | 19 +++++-------------- 2 files changed, 11 insertions(+), 29 deletions(-) diff --git a/floodsub.go b/floodsub.go index 563f70c3..a60249dd 100644 --- a/floodsub.go +++ b/floodsub.go @@ -59,7 +59,7 @@ type PubSub struct { topics map[string]map[peer.ID]struct{} // sendMsg handles messages that have been validated - sendMsg chan sendReq + sendMsg chan *sendReq peers map[peer.ID]chan *RPC seenMessages *timecache.TimeCache @@ -85,10 +85,8 @@ type RPC struct { from peer.ID } -type Option func(*PubSub) error - // NewFloodSub returns a new FloodSub management object -func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { +func NewFloodSub(ctx context.Context, h host.Host) *PubSub { ps := &PubSub{ host: h, ctx: ctx, @@ -100,7 +98,7 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), getTopics: make(chan *topicReq), - sendMsg: make(chan sendReq), + sendMsg: make(chan *sendReq), myTopics: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), @@ -108,19 +106,12 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err counter: uint64(time.Now().UnixNano()), } - for _, opt := range opts { - err := opt(ps) - if err != nil { - return nil, err - } - } - h.SetStreamHandler(ID, ps.handleNewStream) h.Network().Notify((*PubSubNotif)(ps)) go ps.processLoop(ctx) - return ps, nil + return ps } // processLoop handles all inputs arriving on the channels @@ -367,7 +358,7 @@ func (p *PubSub) pushMsg(subs []*Subscription, src peer.ID, msg *Message) { return } - sreq := sendReq{from: src, msg: msg} + sreq := &sendReq{from: src, msg: msg} select { case p.sendMsg <- sreq: default: @@ -422,7 +413,7 @@ loop: } // all validators were successful, send the message - p.sendMsg <- sendReq{ + p.sendMsg <- &sendReq{ from: src, msg: msg, } diff --git a/floodsub_test.go b/floodsub_test.go index 351f5841..9b134045 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -81,13 +81,10 @@ func connectAll(t *testing.T, hosts []host.Host) { } } -func getPubsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { +func getPubsubs(ctx context.Context, hs []host.Host) []*PubSub { var psubs []*PubSub for _, h := range hs { - ps, err := NewFloodSub(ctx, h, opts...) - if err != nil { - panic(err) - } + ps := NewFloodSub(ctx, h) psubs = append(psubs, ps) } return psubs @@ -294,14 +291,11 @@ func TestSelfReceive(t *testing.T) { host := getNetHosts(t, ctx, 1)[0] - psub, err := NewFloodSub(ctx, host) - if err != nil { - t.Fatal(err) - } + psub := NewFloodSub(ctx, host) msg := []byte("hello world") - err = psub.Publish("foobar", msg) + err := psub.Publish("foobar", msg) if err != nil { t.Fatal(err) } @@ -677,10 +671,7 @@ func TestSubReporting(t *testing.T) { defer cancel() host := getNetHosts(t, ctx, 1)[0] - psub, err := NewFloodSub(ctx, host) - if err != nil { - t.Fatal(err) - } + psub := NewFloodSub(ctx, host) fooSub, err := psub.Subscribe("foo") if err != nil {