Skip to content

Commit

Permalink
revert NewFloodSub interface change, use pointers for sendReq
Browse files Browse the repository at this point in the history
address self-review comments.
  • Loading branch information
vyzo committed Jan 13, 2018
1 parent 5ef13c7 commit 04146f5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 29 deletions.
21 changes: 6 additions & 15 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type PubSub struct {
topics map[string]map[peer.ID]struct{}

// sendMsg handles messages that have been validated
sendMsg chan sendReq
sendMsg chan *sendReq

peers map[peer.ID]chan *RPC
seenMessages *timecache.TimeCache
Expand All @@ -85,10 +85,8 @@ type RPC struct {
from peer.ID
}

type Option func(*PubSub) error

// NewFloodSub returns a new FloodSub management object
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
ps := &PubSub{
host: h,
ctx: ctx,
Expand All @@ -100,27 +98,20 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan sendReq),
sendMsg: make(chan *sendReq),
myTopics: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
seenMessages: timecache.NewTimeCache(time.Second * 30),
counter: uint64(time.Now().UnixNano()),
}

for _, opt := range opts {
err := opt(ps)
if err != nil {
return nil, err
}
}

h.SetStreamHandler(ID, ps.handleNewStream)
h.Network().Notify((*PubSubNotif)(ps))

go ps.processLoop(ctx)

return ps, nil
return ps
}

// processLoop handles all inputs arriving on the channels
Expand Down Expand Up @@ -367,7 +358,7 @@ func (p *PubSub) pushMsg(subs []*Subscription, src peer.ID, msg *Message) {
return
}

sreq := sendReq{from: src, msg: msg}
sreq := &sendReq{from: src, msg: msg}
select {
case p.sendMsg <- sreq:
default:
Expand Down Expand Up @@ -422,7 +413,7 @@ loop:
}

// all validators were successful, send the message
p.sendMsg <- sendReq{
p.sendMsg <- &sendReq{
from: src,
msg: msg,
}
Expand Down
19 changes: 5 additions & 14 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,10 @@ func connectAll(t *testing.T, hosts []host.Host) {
}
}

func getPubsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub {
func getPubsubs(ctx context.Context, hs []host.Host) []*PubSub {
var psubs []*PubSub
for _, h := range hs {
ps, err := NewFloodSub(ctx, h, opts...)
if err != nil {
panic(err)
}
ps := NewFloodSub(ctx, h)
psubs = append(psubs, ps)
}
return psubs
Expand Down Expand Up @@ -294,14 +291,11 @@ func TestSelfReceive(t *testing.T) {

host := getNetHosts(t, ctx, 1)[0]

psub, err := NewFloodSub(ctx, host)
if err != nil {
t.Fatal(err)
}
psub := NewFloodSub(ctx, host)

msg := []byte("hello world")

err = psub.Publish("foobar", msg)
err := psub.Publish("foobar", msg)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -677,10 +671,7 @@ func TestSubReporting(t *testing.T) {
defer cancel()

host := getNetHosts(t, ctx, 1)[0]
psub, err := NewFloodSub(ctx, host)
if err != nil {
t.Fatal(err)
}
psub := NewFloodSub(ctx, host)

fooSub, err := psub.Subscribe("foo")
if err != nil {
Expand Down

0 comments on commit 04146f5

Please sign in to comment.