Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Validators #55

Merged
merged 27 commits into from
Jan 24, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
647bb98
optionally allow caller to validate messages
Nov 8, 2017
930f264
typedef subscription options and fix typo
Nov 16, 2017
1945f89
log when validator discards message
Nov 16, 2017
7dd4e0b
vet
Nov 16, 2017
197a598
ungxify
Nov 16, 2017
89e6a06
better tests for validation
Nov 22, 2017
02877cd
complete validator functions
Nov 22, 2017
6e8b9f2
fix timeout
Nov 23, 2017
fe09d1e
make validator timeout configurable
Nov 23, 2017
88274db
make maximum concurrency configurable, split loop
Dec 16, 2017
4241241
fix dangling maxConcurrency reference
vyzo Jan 13, 2018
d2f6a00
WithValidator and WithValidatorTimeout are subscription options
vyzo Jan 13, 2018
982c4de
per subscription validation throttle and more efficient dispatch logic
vyzo Jan 13, 2018
fba445b
code cosmetics
vyzo Jan 13, 2018
c95ed28
add validation context for cancelation on aborts
vyzo Jan 13, 2018
5ef13c7
don't always spawn a goroutine for sending a new message
vyzo Jan 13, 2018
bf2151b
the sendMsg channel should yield pointers for consistency
vyzo Jan 13, 2018
856a25c
WithMaxConcurrency is WithValidatorConcurrency
vyzo Jan 13, 2018
edcb251
install global validation throttle, use reasonable defaults.
vyzo Jan 13, 2018
473a5d2
sendMsg should have a buffer
vyzo Jan 13, 2018
f6081fb
pushMsg should just call maybePublishMessage when it doesn't need val…
vyzo Jan 13, 2018
145a84a
use a single channel for all validation results
vyzo Jan 13, 2018
f1be0f1
don't spawn an extra goroutine for the validator context
vyzo Jan 13, 2018
cb365a5
remove faulty tests
vyzo Jan 14, 2018
fceb00d
improved comment about global validation throttle
vyzo Jan 14, 2018
bbdec3f
implement per topic validators
vyzo Jan 18, 2018
3f4fc21
fix comment, subscriptions don't have validators any more.
vyzo Jan 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 190 additions & 22 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import (
timecache "github.com/whyrusleeping/timecache"
)

const ID = protocol.ID("/floodsub/1.0.0")
const (
ID = protocol.ID("/floodsub/1.0.0")
defaultMaxConcurrency = 10
defaultValidateTimeout = 150 * time.Millisecond
)

var log = logging.Logger("floodsub")

Expand Down Expand Up @@ -54,6 +58,9 @@ type PubSub struct {
// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}

// sendMsg handles messages that have been validated
sendMsg chan sendReq
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that this should be a chan *sendReq for consistency with the other channels.


peers map[peer.ID]chan *RPC
seenMessages *timecache.TimeCache

Expand All @@ -78,8 +85,10 @@ type RPC struct {
from peer.ID
}

type Option func(*PubSub) error

// NewFloodSub returns a new FloodSub management object
func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could avoid the interface change if we don't use any options -- do we want this for future proofing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it, it's a nice interface; also we may want to pass an option for global validation throttle.

ps := &PubSub{
host: h,
ctx: ctx,
Expand All @@ -91,19 +100,27 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan sendReq),
myTopics: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
seenMessages: timecache.NewTimeCache(time.Second * 30),
counter: uint64(time.Now().UnixNano()),
}

for _, opt := range opts {
err := opt(ps)
if err != nil {
return nil, err
}
}

h.SetStreamHandler(ID, ps.handleNewStream)
h.Network().Notify((*PubSubNotif)(ps))

go ps.processLoop(ctx)

return ps
return ps, nil
}

// processLoop handles all inputs arriving on the channels
Expand Down Expand Up @@ -176,7 +193,12 @@ func (p *PubSub) processLoop(ctx context.Context) {
continue
}
case msg := <-p.publish:
p.maybePublishMessage(p.host.ID(), msg.Message)
subs := p.getSubscriptions(msg)
p.pushMsg(subs, p.host.ID(), msg)

case req := <-p.sendMsg:
p.maybePublishMessage(req.from, req.msg.Message)

case <-ctx.Done():
log.Info("pubsub processloop shutting down")
return
Expand Down Expand Up @@ -210,24 +232,22 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
// subscribes to the topic.
// Only called from processLoop.
func (p *PubSub) handleAddSubscription(req *addSubReq) {
subs := p.myTopics[req.topic]
sub := req.sub
subs := p.myTopics[sub.topic]

// announce we want this topic
if len(subs) == 0 {
p.announce(req.topic, true)
p.announce(sub.topic, true)
}

// make new if not there
if subs == nil {
p.myTopics[req.topic] = make(map[*Subscription]struct{})
subs = p.myTopics[req.topic]
p.myTopics[sub.topic] = make(map[*Subscription]struct{})
subs = p.myTopics[sub.topic]
}

sub := &Subscription{
ch: make(chan *Message, 32),
topic: req.topic,
cancelCh: p.cancelCh,
}
sub.ch = make(chan *Message, 32)
sub.cancelCh = p.cancelCh

p.myTopics[sub.topic][sub] = struct{}{}

Expand Down Expand Up @@ -314,8 +334,11 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
continue
}

p.maybePublishMessage(rpc.from, pmsg)
msg := &Message{pmsg}
subs := p.getSubscriptions(msg)
p.pushMsg(subs, rpc.from, msg)
}

return nil
}

Expand All @@ -324,6 +347,85 @@ func msgID(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}

// pushMsg pushes a message to a number of subscriptions, performing validation
// as necessary
func (p *PubSub) pushMsg(subs []*Subscription, src peer.ID, msg *Message) {
// we perform validation if _any_ of the subscriptions has a validator
// because the message is sent once for all topics
needval := false
for _, sub := range subs {
if sub.validate != nil {
needval = true
break
}
}

if needval {
// validation is asynchronous
// XXX vyzo: do we want a global validation throttle here?
go p.validate(subs, src, msg)
return
}

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm... spawning a goroutine for each message being sent is a bit wonky. Maybe:

select {
case p.sendMsg <- .....:
default:
  go func() {
    p.sendMsg <- ....
  }()
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we only have to spawn a new goroutine if it's a publish, in which case the push happens within the event loop.
I'll update.

Copy link
Collaborator Author

@vyzo vyzo Jan 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are always in the event loop actually. I added a buffer to sendMsg so that we almost always avoid the goroutine.

p.sendMsg <- sendReq{
from: src,
msg: msg,
}
}()
}

// validate performs validation and only sends the message if all validators succeed
func (p *PubSub) validate(subs []*Subscription, src peer.ID, msg *Message) {
ctx, cancel := context.WithCancel(p.ctx)
defer cancel()

results := make([]chan bool, 0, len(subs))
throttle := false

loop:
for _, sub := range subs {
if sub.validate == nil {
continue
}

rch := make(chan bool, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just use a single channel and count results from it. Allocating channels isn't that expensive but it's certainly not free.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough; it was really convenient to write this way though :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, implemented!

results = append(results, rch)

select {
case sub.validateThrottle <- struct{}{}:
go func(sub *Subscription, rch chan bool) {
rch <- sub.validateMsg(ctx, msg)
<-sub.validateThrottle
}(sub, rch)

default:
log.Debugf("validation throttled for topic %s", sub.topic)
throttle = true
break loop
}
}

if throttle {
log.Warningf("message validation throttled; dropping message from %s", src)
return
}

for _, rch := range results {
valid := <-rch
if !valid {
log.Warningf("message validation failed; dropping message from %s", src)
return
}
}

// all validators were successful, send the message
p.sendMsg <- sendReq{
from: src,
msg: msg,
}
}

func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
id := msgID(pmsg)
if p.seenMessage(id) {
Expand All @@ -348,7 +450,7 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
continue
}

for p, _ := range tmap {
for p := range tmap {
tosend[p] = struct{}{}
}
}
Expand All @@ -375,20 +477,64 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
return nil
}

// getSubscriptions returns all subscriptions the would receive the given message.
func (p *PubSub) getSubscriptions(msg *Message) []*Subscription {
var subs []*Subscription

for _, topic := range msg.GetTopicIDs() {
tSubs, ok := p.myTopics[topic]
if !ok {
continue
}

for sub := range tSubs {
subs = append(subs, sub)
}
}

return subs
}

type addSubReq struct {
topic string
resp chan *Subscription
sub *Subscription
resp chan *Subscription
}

type SubOpt func(*Subscription) error
type Validator func(context.Context, *Message) bool

// WithValidator is an option that can be supplied to Subscribe. The argument is a function that returns whether or not a given message should be propagated further.
func WithValidator(validate Validator) SubOpt {
return func(sub *Subscription) error {
sub.validate = validate
return nil
}
}

// WithValidatorTimeout is an option that can be supplied to Subscribe. The argument is a duration after which long-running validators are canceled.
func WithValidatorTimeout(timeout time.Duration) SubOpt {
return func(sub *Subscription) error {
sub.validateTimeout = timeout
return nil
}
}

func WithMaxConcurrency(n int) SubOpt {
return func(sub *Subscription) error {
sub.validateThrottle = make(chan struct{}, n)
return nil
}
}

// Subscribe returns a new Subscription for the given topic
func (p *PubSub) Subscribe(topic string) (*Subscription, error) {
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) {
td := pb.TopicDescriptor{Name: &topic}

return p.SubscribeByTopicDescriptor(&td)
return p.SubscribeByTopicDescriptor(&td, opts...)
}

// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscription, error) {
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) {
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
return nil, fmt.Errorf("auth mode not yet supported")
}
Expand All @@ -397,10 +543,26 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor) (*Subscripti
return nil, fmt.Errorf("encryption mode not yet supported")
}

sub := &Subscription{
topic: td.GetName(),
validateTimeout: defaultValidateTimeout,
}

for _, opt := range opts {
err := opt(sub)
if err != nil {
return nil, err
}
}

if sub.validate != nil && sub.validateThrottle == nil {
sub.validateThrottle = make(chan struct{}, defaultMaxConcurrency)
}

out := make(chan *Subscription, 1)
p.addSub <- &addSubReq{
topic: td.GetName(),
resp: out,
sub: sub,
resp: out,
}

return <-out, nil
Expand Down Expand Up @@ -439,6 +601,12 @@ type listPeerReq struct {
topic string
}

// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done.
type sendReq struct {
from peer.ID
msg *Message
}

// ListPeers returns a list of peers we are connected to.
func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
Expand Down
Loading