From ca67b4d722bdc4cfedac2072be7d995e8903c56c Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 11 Oct 2019 22:08:40 +0800 Subject: [PATCH] allow peer to be in Relay ONLY mode for a topic --- go.mod | 1 + gossipsub_test.go | 139 ++++++++++++++++++++++++++++++++++++++++ pubsub.go | 160 ++++++++++++++++++++++++++++++++++++++++------ relay.go | 20 ++++++ subscription.go | 2 + 5 files changed, 304 insertions(+), 18 deletions(-) create mode 100644 relay.go diff --git a/go.mod b/go.mod index c22fa9b7..5c9ba756 100644 --- a/go.mod +++ b/go.mod @@ -9,5 +9,6 @@ require ( github.com/libp2p/go-libp2p-swarm v0.1.0 github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multistream v0.1.0 + github.com/pkg/errors v0.8.1 github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee ) diff --git a/gossipsub_test.go b/gossipsub_test.go index 67af2b76..37fe9fe9 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -363,6 +363,145 @@ func TestGossipsubGossip(t *testing.T) { time.Sleep(time.Second * 2) } +func TestRelayOnlyPeerForwardsMessage(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 8) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + connect(t, hosts[2], hosts[3]) + connect(t, hosts[1], hosts[4]) + + connect(t, hosts[0], hosts[5]) + connect(t, hosts[5], hosts[6]) + connect(t, hosts[6], hosts[7]) + /* + R= Relay ONLY + S = Subscriber + + [0](R) -> [1](R) -> [2](R) -> [3](S) + | L->[4](S) + v + [5](R) + | + v + [6](R) -> [7](S) + */ + + var subs []*Subscription + for i, ps := range psubs { + if i == 3 || i == 4 || i == 7 { + ch, err := ps.Subscribe("foo") + if err != nil { + panic(err) + } + subs = append(subs, ch) + } else { + _, err := ps.Join("foo") + if err != nil { + t.Fatal(err) + } + } + } + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + // assert that published message is received + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foo", msg) + + for _, sub := range subs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipsubGossipWithSomeRelayOnlyPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getNetHosts(t, ctx, 15) + + psubs := getGossipsubs(ctx, hosts) + + subscribeToTopic := func(topic string) []*Subscription { + var msgs []*Subscription + + for i, ps := range psubs { + if i%4 != 0 { + subch, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + msgs = append(msgs, subch) + } else { + _, err := ps.Join(topic) + if err != nil { + t.Fatal(err) + } + } + } + return msgs + } + + msgs := subscribeToTopic("foobar") + xmsgs := subscribeToTopic("bazcrux") + + denseConnect(t, hosts) + + // wait for heartbeats to build mesh + time.Sleep(time.Second * 2) + + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + psubs[owner].Publish("foobar", msg) + psubs[owner].Publish("bazcrux", msg) + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + for _, sub := range xmsgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + // wait a bit to have some gossip interleaved + time.Sleep(time.Millisecond * 100) + } + + // and wait for some gossip flushing + time.Sleep(time.Second * 2) +} + func TestGossipsubGossipPiggyback(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pubsub.go b/pubsub.go index 4635b388..2cef7dc6 100644 --- a/pubsub.go +++ b/pubsub.go @@ -10,6 +10,7 @@ import ( "time" pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/pkg/errors" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -51,6 +52,9 @@ type PubSub struct { // addSub is a control channel for us to add and remove subscriptions addSub chan *addSubReq + // addRelay is a control channel for us to add relays + addRelay chan *addRelayReq + // get list of topics we are subscribed to getTopics chan *topicReq @@ -58,7 +62,10 @@ type PubSub struct { getPeers chan *listPeerReq // send subscription here to cancel it - cancelCh chan *Subscription + cancelSubCh chan *Subscription + + // send relay here to cancel it + cancelRelayCh chan *Relay // a notification channel for new peer connections newPeers chan peer.ID @@ -75,6 +82,10 @@ type PubSub struct { // The set of topics we are subscribed to myTopics map[string]map[*Subscription]struct{} + // The set of topics we relay for. 'myTopics' should be a subset of this as we do not + // currently allow Subscription without Relay + relayTopics map[string]map[*Relay]struct{} + // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} @@ -166,15 +177,18 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option newPeerStream: make(chan network.Stream), newPeerError: make(chan peer.ID), peerDead: make(chan peer.ID), - cancelCh: make(chan *Subscription), + cancelSubCh: make(chan *Subscription), + cancelRelayCh: make(chan *Relay), getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), + addRelay: make(chan *addRelayReq), getTopics: make(chan *topicReq), sendMsg: make(chan *sendReq, 32), addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), eval: make(chan func()), myTopics: make(map[string]map[*Subscription]struct{}), + relayTopics: make(map[string]map[*Relay]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), blacklist: NewMapBlacklist(), @@ -348,10 +362,15 @@ func (p *PubSub) processLoop(ctx context.Context) { out = append(out, t) } treq.resp <- out - case sub := <-p.cancelCh: + + case r := <-p.cancelRelayCh: + p.handleRemoveRelay(r) + case sub := <-p.cancelSubCh: p.handleRemoveSubscription(sub) case sub := <-p.addSub: p.handleAddSubscription(sub) + case r := <-p.addRelay: + p.handleAddRelay(r) case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -411,10 +430,28 @@ func (p *PubSub) processLoop(ctx context.Context) { } } -// handleRemoveSubscription removes Subscription sub from bookeeping. -// If this was the last Subscription for a given topic, it will also announce +// handleRemoveRelay removes Relay r from bookeeping. +// If it was the last Relay for a given topic, it will also announce // that this node is not subscribing to this topic anymore. // Only called from processLoop. +func (p *PubSub) handleRemoveRelay(r *Relay) { + relays := p.relayTopics[r.topic] + + if relays == nil { + return + } + + delete(relays, r) + + if len(relays) == 0 { + delete(p.relayTopics, r.topic) + p.announce(r.topic, false) + p.rt.Leave(r.topic) + } +} + +// handleRemoveSubscription removes Subscription sub from bookeeping. +// It then calls for the removal of the associated Relay. func (p *PubSub) handleRemoveSubscription(sub *Subscription) { subs := p.myTopics[sub.topic] @@ -428,25 +465,42 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { if len(subs) == 0 { delete(p.myTopics, sub.topic) - p.announce(sub.topic, false) - p.rt.Leave(sub.topic) } + p.handleRemoveRelay(sub.relay) } -// handleAddSubscription adds a Subscription for a particular topic. If it is -// the first Subscription for the topic, it will announce that this node +// handleAddRelay adds a Relay for a particular topic. If it is +// the first Relay for the topic, it will announce that this node // subscribes to the topic. // Only called from processLoop. -func (p *PubSub) handleAddSubscription(req *addSubReq) { - sub := req.sub - subs := p.myTopics[sub.topic] +func (p *PubSub) handleAddRelay(req *addRelayReq) { + relay := req.r + relays := p.relayTopics[relay.topic] // announce we want this topic - if len(subs) == 0 { - p.announce(sub.topic, true) - p.rt.Join(sub.topic) + if len(relays) == 0 { + p.announce(relay.topic, true) + p.rt.Join(relay.topic) } + // make new if not there + if relays == nil { + p.relayTopics[relay.topic] = make(map[*Relay]struct{}) + } + + relay.cancelCh = p.cancelRelayCh + + p.relayTopics[relay.topic][relay] = struct{}{} + + req.resp <- relay +} + +// handleAddSubscription adds a Subscription for a particular topic. +// Only called from processLoop. +func (p *PubSub) handleAddSubscription(req *addSubReq) { + sub := req.sub + subs := p.myTopics[sub.topic] + // make new if not there if subs == nil { p.myTopics[sub.topic] = make(map[*Subscription]struct{}) @@ -458,7 +512,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { for p := range tmap { sub.evtLog[p] = PeerJoin } - sub.cancelCh = p.cancelCh + sub.cancelCh = p.cancelSubCh p.myTopics[sub.topic][sub] = struct{}{} @@ -570,6 +624,21 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { return false } +// relayForMessage returns whether we relay for one of the topics +// of a given message +func (p *PubSub) relayForMessage(msg *pb.Message) bool { + if len(p.relayTopics) == 0 { + return false + } + + for _, t := range msg.GetTopicIDs() { + if _, ok := p.relayTopics[t]; ok { + return true + } + } + return false +} + func (p *PubSub) notifyLeave(topic string, pid peer.ID) { if subs, ok := p.myTopics[topic]; ok { for s := range subs { @@ -611,8 +680,8 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { } for _, pmsg := range rpc.GetPublish() { - if !p.subscribedToMsg(pmsg) { - log.Warning("received message we didn't subscribe to. Dropping.") + if !(p.subscribedToMsg(pmsg) || p.relayForMessage(pmsg)) { + log.Warning("received message we didn't subscribe to or relay for. Dropping.") continue } @@ -673,6 +742,11 @@ type addSubReq struct { resp chan *Subscription } +type addRelayReq struct { + r *Relay + resp chan *Relay +} + type SubOpt func(sub *Subscription) error // Subscribe returns a new Subscription for the given topic. @@ -684,6 +758,50 @@ func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) return p.SubscribeByTopicDescriptor(&td, opts...) } +type RelayOpt func(r *Relay) error + +// Join returns a new Relay for the given topic which will enable us to +// propagate messages we receive to other peers without having to 'consume'/'subscribe' them. +// For now, we ONLY support Relay without subscription. +// We do NOT allow subscription without Relay. +// Note that Join is not an instanteneous operation. It may take some time +// before it is processed by the pubsub main loop and propagated to our peers. +func (p *PubSub) Join(topic string, opts ...RelayOpt) (*Relay, error) { + td := pb.TopicDescriptor{Name: &topic} + + return p.JoinByTopicDescriptor(&td, opts...) +} + +func (p *PubSub) JoinByTopicDescriptor(td *pb.TopicDescriptor, opts ...RelayOpt) (*Relay, error) { + if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { + return nil, fmt.Errorf("auth mode not yet supported for relay") + } + + if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { + return nil, fmt.Errorf("encryption mode not yet supported for relay") + } + + relay := &Relay{ + topic: td.GetName(), + ctx: p.ctx, + } + + for _, opt := range opts { + err := opt(relay) + if err != nil { + return nil, err + } + } + + out := make(chan *Relay, 1) + p.addRelay <- &addRelayReq{ + r: relay, + resp: out, + } + + return <-out, nil +} + // SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor. func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) { if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { @@ -694,7 +812,13 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO return nil, fmt.Errorf("encryption mode not yet supported") } + relay, err := p.Join(td.GetName()) + if err != nil { + return nil, errors.Wrap(err, "failed to create relay for subscription") + } + sub := &Subscription{ + relay: relay, topic: td.GetName(), ctx: p.ctx, diff --git a/relay.go b/relay.go new file mode 100644 index 00000000..ab65f0e3 --- /dev/null +++ b/relay.go @@ -0,0 +1,20 @@ +package pubsub + +import "context" + +type Relay struct { + topic string + cancelCh chan<- *Relay + ctx context.Context +} + +func (r *Relay) Topic() string { + return r.topic +} + +func (r *Relay) Cancel() { + select { + case r.cancelCh <- r: + case <-r.ctx.Done(): + } +} diff --git a/subscription.go b/subscription.go index b3ddf836..2a604245 100644 --- a/subscription.go +++ b/subscription.go @@ -14,6 +14,8 @@ const ( ) type Subscription struct { + relay *Relay + topic string ch chan *Message cancelCh chan<- *Subscription