Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow peer to be in Relay ONLY mode for a topic #208

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ require (
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multistream v0.1.0
github.com/pkg/errors v0.8.1
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
)
139 changes: 139 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,145 @@ func TestGossipsubGossip(t *testing.T) {
time.Sleep(time.Second * 2)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add some tests for Floodsub as well.

func TestRelayOnlyPeerForwardsMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 8)
psubs := getPubsubs(ctx, hosts)

connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
connect(t, hosts[2], hosts[3])
connect(t, hosts[1], hosts[4])

connect(t, hosts[0], hosts[5])
connect(t, hosts[5], hosts[6])
connect(t, hosts[6], hosts[7])
/*
R= Relay ONLY
S = Subscriber

[0](R) -> [1](R) -> [2](R) -> [3](S)
| L->[4](S)
v
[5](R)
|
v
[6](R) -> [7](S)
*/

var subs []*Subscription
for i, ps := range psubs {
if i == 3 || i == 4 || i == 7 {
ch, err := ps.Subscribe("foo")
if err != nil {
panic(err)
}
subs = append(subs, ch)
} else {
_, err := ps.Join("foo")
if err != nil {
t.Fatal(err)
}
}
}

// wait for heartbeats to build mesh
time.Sleep(time.Second * 2)

// assert that published message is received
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))

owner := rand.Intn(len(psubs))

psubs[owner].Publish("foo", msg)

for _, sub := range subs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}

func TestGossipsubGossipWithSomeRelayOnlyPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 15)
Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change number of hosts to 20 for consistency.


psubs := getGossipsubs(ctx, hosts)

subscribeToTopic := func(topic string) []*Subscription {
var msgs []*Subscription

for i, ps := range psubs {
if i%4 != 0 {
subch, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}

msgs = append(msgs, subch)
} else {
_, err := ps.Join(topic)
if err != nil {
t.Fatal(err)
}
}
}
return msgs
}

msgs := subscribeToTopic("foobar")
xmsgs := subscribeToTopic("bazcrux")

denseConnect(t, hosts)

// wait for heartbeats to build mesh
time.Sleep(time.Second * 2)

for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))

owner := rand.Intn(len(psubs))

psubs[owner].Publish("foobar", msg)
psubs[owner].Publish("bazcrux", msg)

for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}

for _, sub := range xmsgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}

// wait a bit to have some gossip interleaved
time.Sleep(time.Millisecond * 100)
}

// and wait for some gossip flushing
time.Sleep(time.Second * 2)
}

func TestGossipsubGossipPiggyback(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
Loading